Skip to content

Container

AutoWired

Collects the rules automatically generated by implicit wiring.

Used to report back to the [AssemblyNode][fmu_manipulation_toolbox.assembly.AssemblyNode] which inputs, outputs, and links were created by auto-wiring, so they can be recorded in the assembly topology.

Attributes:

Name Type Description
rule_input list[list[str]]

Auto-generated input rules [exposed_name, fmu_name, port_name].

rule_output list[list[str]]

Auto-generated output rules [fmu_name, port_name, exposed_name].

rule_link list[list[str]]

Auto-generated link rules [from_fmu, from_port, to_fmu, to_port].

nb_param int

Number of auto-exposed parameters (subset of inputs).

Source code in fmu_manipulation_toolbox/container.py
class AutoWired:
    """Collects the rules automatically generated by implicit wiring.

    Used to report back to the
    [AssemblyNode][fmu_manipulation_toolbox.assembly.AssemblyNode] which
    inputs, outputs, and links were created by auto-wiring, so they
    can be recorded in the assembly topology.

    Attributes:
        rule_input (list[list[str]]): Auto-generated input rules
            `[exposed_name, fmu_name, port_name]`.
        rule_output (list[list[str]]): Auto-generated output rules
            `[fmu_name, port_name, exposed_name]`.
        rule_link (list[list[str]]): Auto-generated link rules
            `[from_fmu, from_port, to_fmu, to_port]`.
        nb_param (int): Number of auto-exposed parameters (subset of inputs).
    """

    def __init__(self):
        self.rule_input = []
        self.rule_output = []
        self.rule_link = []
        self.nb_param = 0

    def __repr__(self):
        return (f"{self.nb_param} parameters, {len(self.rule_input) - self.nb_param} inputs,"
                f" {len(self.rule_output)} outputs, {len(self.rule_link)} links.")

    def add_input(self, from_port, to_fmu, to_port):
        self.rule_input.append([from_port, to_fmu, to_port])

    def add_parameter(self, from_port, to_fmu, to_port):
        self.rule_input.append([from_port, to_fmu, to_port])
        self.nb_param += 1

    def add_output(self, from_fmu, from_port, to_port):
        self.rule_output.append([from_fmu, from_port, to_port])

    def add_link(self, from_fmu, from_port, to_fmu, to_port):
        self.rule_link.append([from_fmu, from_port, to_fmu, to_port])

ClockList

Tracks clocks that need to be scheduled by the FMI importer.

Used for LS-BUS support where the container runtime needs to trigger countdown clocks on embedded FMUs.

Attributes:

Name Type Description
clocks_per_fmu dict[int, list[tuple[int, int]]]

Clock entries per FMU index: (fmu_vr, local_vr) pairs.

fmu_index dict[str, int]

Mapping from FMU name to its index in the container.

Source code in fmu_manipulation_toolbox/container.py
class ClockList:
    """Tracks clocks that need to be scheduled by the FMI importer.

    Used for LS-BUS support where the container runtime needs to trigger
    countdown clocks on embedded FMUs.

    Attributes:
        clocks_per_fmu (dict[int, list[tuple[int, int]]]): Clock entries
            per FMU index: `(fmu_vr, local_vr)` pairs.
        fmu_index (dict[str, int]): Mapping from FMU name to its index
            in the container.
    """

    def __init__(self, involved_fmu: OrderedDict[str, EmbeddedFMU]):
        self.clocks_per_fmu: DefaultDict[int, List[Tuple[int, int]]] = defaultdict(list)
        self.fmu_index: Dict[str, int] = {}
        for i, fmu_name in enumerate(involved_fmu):
            self.fmu_index[fmu_name] = i

    def append(self, cport: ContainerPort, vr: int):
        """Register a clock for importer scheduling.

        Args:
            cport (ContainerPort): The clocked port on the embedded FMU.
            vr (int): The local value reference of the clock.
        """
        self.clocks_per_fmu[self.fmu_index[cport.fmu.name]].append((cport.port.vr, vr))

    def write_txt(self, txt_file):
        """Write the clock scheduling table to the `container.txt` file.

        Args:
            txt_file: Writable text file handle.
        """
        print(f"# importer CLOCKS: <FMU_INDEX> <NB> <FMU_VR> <VR> [<FMU_VR> <VR>]", file=txt_file)
        nb_total_clocks = 0
        for clocks in self.clocks_per_fmu.values():
            nb_total_clocks += len(clocks)

        print(f"{len(self.clocks_per_fmu)} {nb_total_clocks}", file=txt_file)
        for index, clocks in self.clocks_per_fmu.items():
            clocks_str = " ".join([f"{clock[0]} {clock[1]}" for clock in clocks])
            print(f"{index} {len(clocks)} {clocks_str}", file=txt_file)

append(cport, vr)

Register a clock for importer scheduling.

Parameters:

Name Type Description Default
cport ContainerPort

The clocked port on the embedded FMU.

required
vr int

The local value reference of the clock.

required
Source code in fmu_manipulation_toolbox/container.py
def append(self, cport: ContainerPort, vr: int):
    """Register a clock for importer scheduling.

    Args:
        cport (ContainerPort): The clocked port on the embedded FMU.
        vr (int): The local value reference of the clock.
    """
    self.clocks_per_fmu[self.fmu_index[cport.fmu.name]].append((cport.port.vr, vr))

write_txt(txt_file)

Write the clock scheduling table to the container.txt file.

Parameters:

Name Type Description Default
txt_file

Writable text file handle.

required
Source code in fmu_manipulation_toolbox/container.py
def write_txt(self, txt_file):
    """Write the clock scheduling table to the `container.txt` file.

    Args:
        txt_file: Writable text file handle.
    """
    print(f"# importer CLOCKS: <FMU_INDEX> <NB> <FMU_VR> <VR> [<FMU_VR> <VR>]", file=txt_file)
    nb_total_clocks = 0
    for clocks in self.clocks_per_fmu.values():
        nb_total_clocks += len(clocks)

    print(f"{len(self.clocks_per_fmu)} {nb_total_clocks}", file=txt_file)
    for index, clocks in self.clocks_per_fmu.items():
        clocks_str = " ".join([f"{clock[0]} {clock[1]}" for clock in clocks])
        print(f"{index} {len(clocks)} {clocks_str}", file=txt_file)

ContainerInput

Represents an input port exposed by the container.

A single container input can fan out to multiple embedded FMU input ports, provided they share the same type and causality.

Attributes:

Name Type Description
name str

Exposed name of the container input.

type_name str

Container-internal type name (e.g. "real64").

causality str

Port causality ("input" or "parameter").

cport_list list[ContainerPort]

List of embedded FMU ports connected to this input.

vr int | None

Value reference assigned by the container.

Source code in fmu_manipulation_toolbox/container.py
class ContainerInput:
    """Represents an input port exposed by the container.

    A single container input can fan out to multiple embedded FMU input ports,
    provided they share the same type and causality.

    Attributes:
        name (str): Exposed name of the container input.
        type_name (str): Container-internal type name (e.g. `"real64"`).
        causality (str): Port causality (`"input"` or `"parameter"`).
        cport_list (list[ContainerPort]): List of embedded FMU ports connected
            to this input.
        vr (int | None): Value reference assigned by the container.
    """

    def __init__(self, name: str, cport_to: ContainerPort):
        self.name = name
        self.type_name = cport_to.port.type_name
        self.causality = cport_to.port.causality
        self.cport_list = [cport_to]
        self.vr = None

    def add_cport(self, cport_to: ContainerPort):
        """Connect an additional embedded FMU port to this container input.

        Args:
            cport_to (ContainerPort): The embedded FMU port to connect.

        Raises:
            FMUContainerError: If the port is already connected, or if types
                or causalities do not match.
        """
        if cport_to in self.cport_list: # Cannot be reached ! (Assembly prevent this to happen)
            raise FMUContainerError(f"Duplicate INPUT {cport_to} already connected to {self.name}")

        if cport_to.port.type_name != self.type_name:
            raise FMUContainerError(f"Cannot connect {self.name} of type {self.type_name} to "
                                    f"{cport_to} of type {cport_to.port.type_name}")

        if cport_to.port.causality != self.causality:
            raise FMUContainerError(f"Cannot connect {self.causality.upper()} {self.name} to "
                                    f"{cport_to.port.causality.upper()} {cport_to}")

        self.cport_list.append(cport_to)

add_cport(cport_to)

Connect an additional embedded FMU port to this container input.

Parameters:

Name Type Description Default
cport_to ContainerPort

The embedded FMU port to connect.

required

Raises:

Type Description
FMUContainerError

If the port is already connected, or if types or causalities do not match.

Source code in fmu_manipulation_toolbox/container.py
def add_cport(self, cport_to: ContainerPort):
    """Connect an additional embedded FMU port to this container input.

    Args:
        cport_to (ContainerPort): The embedded FMU port to connect.

    Raises:
        FMUContainerError: If the port is already connected, or if types
            or causalities do not match.
    """
    if cport_to in self.cport_list: # Cannot be reached ! (Assembly prevent this to happen)
        raise FMUContainerError(f"Duplicate INPUT {cport_to} already connected to {self.name}")

    if cport_to.port.type_name != self.type_name:
        raise FMUContainerError(f"Cannot connect {self.name} of type {self.type_name} to "
                                f"{cport_to} of type {cport_to.port.type_name}")

    if cport_to.port.causality != self.causality:
        raise FMUContainerError(f"Cannot connect {self.causality.upper()} {self.name} to "
                                f"{cport_to.port.causality.upper()} {cport_to}")

    self.cport_list.append(cport_to)

ContainerPort

References a specific port of an embedded FMU within a container.

Wraps an EmbeddedFMUPort together with its parent EmbeddedFMU, and tracks the value reference assigned by the container.

Attributes:

Name Type Description
fmu EmbeddedFMU

The embedded FMU owning this port.

port EmbeddedFMUPort

The port descriptor.

vr int | None

Value reference assigned by the container, or None if not yet assigned.

Raises:

Type Description
FMUContainerError

If the port name does not exist in the FMU.

Source code in fmu_manipulation_toolbox/container.py
class ContainerPort:
    """References a specific port of an embedded FMU within a container.

    Wraps an [EmbeddedFMUPort][fmu_manipulation_toolbox.container.EmbeddedFMUPort]
    together with its parent
    [EmbeddedFMU][fmu_manipulation_toolbox.container.EmbeddedFMU], and tracks
    the value reference assigned by the container.

    Attributes:
        fmu (EmbeddedFMU): The embedded FMU owning this port.
        port (EmbeddedFMUPort): The port descriptor.
        vr (int | None): Value reference assigned by the container, or `None`
            if not yet assigned.

    Raises:
        FMUContainerError: If the port name does not exist in the FMU.
    """

    def __init__(self, fmu: EmbeddedFMU, port_name: str):
        self.fmu = fmu
        try:
            self.port = fmu.ports[port_name]
        except KeyError:
            raise FMUContainerError(f"Port '{fmu.name}/{port_name}' does not exist")
        self.vr = None

    def __repr__(self):
        return f"Port {self.fmu.name}/{self.port.name}"

    def __hash__(self):
        return hash(str(self))

    def __eq__(self, other):
        return str(self) == str(other)

EmbeddedFMU

Bases: OperationAbstract

Represents an FMU loaded and analysed for embedding inside a container.

Parses the modelDescription.xml of an FMU to extract its ports, capabilities, step size, platform support, and co-simulation metadata. Implements OperationAbstract to process the FMU descriptor via the visitor pattern.

Attributes:

Name Type Description
capability_list tuple[str, ...]

FMI capability flags tracked by the container.

fmu FMU

The underlying FMU object.

name str

Filename of the FMU (e.g. "model.fmu").

id str

Lowercase stem of the filename, used as an identifier.

terminals Terminals

FMI Terminals defined by this FMU.

ls LayeredStandard

LS-BUS layered standard information.

step_size float | None

Preferred step size in seconds, or None.

start_time float | None

Default experiment start time.

stop_time float | None

Default experiment stop time.

model_identifier str | None

Co-simulation model identifier.

guid str | None

GUID (FMI 2.0) or instantiation token (FMI 3.0).

fmi_version int | None

FMI version (2 or 3).

platforms set[str]

Supported operating systems (e.g. {"Windows", "Linux"}).

ports dict[str, EmbeddedFMUPort]

Ports of the FMU, keyed by name.

has_event_mode bool

Whether the FMU supports event mode (FMI 3.0).

capabilities dict[str, str]

FMI capability flags and their values.

Raises:

Type Description
FMUContainerError

If the FMU does not implement Co-Simulation mode.

Source code in fmu_manipulation_toolbox/container.py
class EmbeddedFMU(OperationAbstract):
    """Represents an FMU loaded and analysed for embedding inside a container.

    Parses the `modelDescription.xml` of an FMU to extract its ports,
    capabilities, step size, platform support, and co-simulation metadata.
    Implements
    [OperationAbstract][fmu_manipulation_toolbox.operations.OperationAbstract]
    to process the FMU descriptor via the visitor pattern.

    Attributes:
        capability_list (tuple[str, ...]): FMI capability flags tracked by the container.
        fmu (FMU): The underlying
            [FMU][fmu_manipulation_toolbox.operations.FMU] object.
        name (str): Filename of the FMU (e.g. `"model.fmu"`).
        id (str): Lowercase stem of the filename, used as an identifier.
        terminals (Terminals): FMI Terminals defined by this FMU.
        ls (LayeredStandard): LS-BUS layered standard information.
        step_size (float | None): Preferred step size in seconds, or `None`.
        start_time (float | None): Default experiment start time.
        stop_time (float | None): Default experiment stop time.
        model_identifier (str | None): Co-simulation model identifier.
        guid (str | None): GUID (FMI 2.0) or instantiation token (FMI 3.0).
        fmi_version (int | None): FMI version (`2` or `3`).
        platforms (set[str]): Supported operating systems (e.g. `{"Windows", "Linux"}`).
        ports (dict[str, EmbeddedFMUPort]): Ports of the FMU, keyed by name.
        has_event_mode (bool): Whether the FMU supports event mode (FMI 3.0).
        capabilities (dict[str, str]): FMI capability flags and their values.

    Raises:
        FMUContainerError: If the FMU does not implement Co-Simulation mode.
    """

    capability_list = ("needsExecutionTool",
                       "canBeInstantiatedOnlyOncePerProcess",
                       "canHandleVariableCommunicationStepSize")

    def __init__(self, filename):
        self.fmu = FMU(filename)
        self.name = Path(filename).name
        self.id = Path(filename).stem.lower()

        logger.debug(f"Analysing {self.name}")
        self.terminals = Terminals(self.fmu.tmp_directory)
        self.ls = LayeredStandard(self.fmu.tmp_directory)

        self.step_size = None
        self.start_time = None
        self.stop_time = None
        self.model_identifier = None
        self.guid = None
        self.fmi_version = None
        self.platforms = set()
        self.ports: Dict[str, EmbeddedFMUPort] = {}

        self.has_event_mode = False
        self.capabilities: Dict[str, str] = {}
        self.current_port = None  # used during apply_operation()

        self.fmu.apply_operation(self)  # Should be the last command in constructor!
        if self.model_identifier is None:
            raise FMUContainerError(f"FMU '{self.name}' does not implement Co-Simulation mode.")

    def fmi_attrs(self, attrs):
        fmi_version = attrs['fmiVersion']
        if fmi_version == "2.0":
            self.guid = attrs['guid']
            self.fmi_version = 2
        if fmi_version.startswith("3."):
            self.guid = attrs['instantiationToken']
            self.fmi_version = 3

    def cosimulation_attrs(self, attrs: Dict[str, str]):
        self.model_identifier = attrs['modelIdentifier']
        if attrs.get("hasEventMode", "false") == "true":
            self.has_event_mode = True
        for capability in self.capability_list:
            self.capabilities[capability] = attrs.get(capability, "false")

    def experiment_attrs(self, attrs: Dict[str, str]):
        try:
            self.step_size = float(attrs['stepSize'])
        except KeyError:
            logger.warning(f"FMU '{self.name}' does not specify preferred step size")
        self.start_time = float(attrs.get("startTime", 0.0))
        self.stop_time = float(attrs.get("stopTime", self.start_time + 1.0))

    def port_attrs(self, fmu_port: FMUPort):
        # Container will manage Enumeration as Integer
        if fmu_port.fmi_type == "Enumeration":
            if self.fmi_version == 2:
                fmu_port.fmi_type = "Integer"
            else:
                fmu_port.fmi_type = "Int32"
        port = EmbeddedFMUPort(fmu_port.fmi_type, fmu_port, fmi_version=self.fmi_version)
        self.ports[port.name] = port

    def closure(self):
        osname = {
            "win64": "Windows",
            "linux64": "Linux",
            "darwin64": "Darwin",
            "x86_64-windows": "Windows",
            "x86_64-linux": "Linux",
            "aarch64-darwin": "Darwin"
        }
        try:
            for directory in (Path(self.fmu.tmp_directory) / "binaries").iterdir():
                if directory.is_dir() and str(directory.stem) in osname:
                    self.platforms.add(osname[str(directory.stem)])
        except FileNotFoundError:
            pass  # no binaries

    def __repr__(self):
        properties = f"{len(self.ports)} variables, ts={self.step_size}s"
        if len(self.terminals) > 0:
            properties += f", {len(self.terminals)} terminals"
        if len(self.ls) > 0:
            properties += f", {self.ls}"
        return f"'{self.name}' ({properties})"

EmbeddedFMUPort

Represents a port of an FMU embedded inside a container.

Handles the mapping between FMI-standard type names (e.g. Real, Float64) and internal container type names (e.g. real64), and generates the corresponding XML fragments for modelDescription.xml.

Attributes:

Name Type Description
FMI_TO_CONTAINER dict[int, dict[str, str]]

Mapping from FMI type names to container type names, keyed by FMI version.

CONTAINER_TO_FMI dict[int, dict[str, str]]

Reverse mapping from container type names to FMI type names, keyed by FMI version.

ALL_TYPES tuple[str, ...]

All container type names.

causality str

Port causality ("input", "output", "local", "parameter").

variability str | None

Port variability ("continuous", "discrete", etc.).

name str

Port name.

vr int

Value reference in the original FMU.

type_name str

Container-internal type name (e.g. "real64").

start_value str | None

Start value, if defined.

initial str | None

Initial value attribute.

clock str | None

Clock reference for clocked ports.

description str | None

Human-readable description of the port.

Source code in fmu_manipulation_toolbox/container.py
class EmbeddedFMUPort:
    """Represents a port of an FMU embedded inside a container.

    Handles the mapping between FMI-standard type names (e.g. `Real`, `Float64`)
    and internal container type names (e.g. `real64`), and generates the
    corresponding XML fragments for `modelDescription.xml`.

    Attributes:
        FMI_TO_CONTAINER (dict[int, dict[str, str]]): Mapping from FMI type names
            to container type names, keyed by FMI version.
        CONTAINER_TO_FMI (dict[int, dict[str, str]]): Reverse mapping from container
            type names to FMI type names, keyed by FMI version.
        ALL_TYPES (tuple[str, ...]): All container type names.
        causality (str): Port causality (`"input"`, `"output"`, `"local"`,
            `"parameter"`).
        variability (str | None): Port variability (`"continuous"`, `"discrete"`, etc.).
        name (str): Port name.
        vr (int): Value reference in the original FMU.
        type_name (str): Container-internal type name (e.g. `"real64"`).
        start_value (str | None): Start value, if defined.
        initial (str | None): Initial value attribute.
        clock (str | None): Clock reference for clocked ports.
        description (str | None): Human-readable description of the port.
    """

    FMI_TO_CONTAINER = {
        2: {
            'Real': 'real64',
            'Integer': 'integer32',
            'String': 'string',
            'Boolean': 'boolean'
        },
        3: {
            'Float64': 'real64',
            'Float32': 'real32',
            'Int8': 'integer8',
            'UInt8': 'uinteger8',
            'Int16': 'integer16',
            'UInt16': 'uinteger16',
            'Int32': 'integer32',
            'UInt32': 'uinteger32',
            'Int64': 'integer64',
            'UInt64': 'uinteger64',
            'String': 'string',
            'Boolean': 'boolean1',
            'Binary': 'binary',
            'Clock': 'clock'
        }
    }

    CONTAINER_TO_FMI = {
        2: {
            'real64': 'Real',
            'integer32': 'Integer',
            'string': 'String',
            'boolean': 'Boolean'
        },
        3: {
            'real64': 'Float64' ,
            'real32': 'Float32' ,
            'integer8': 'Int8' ,
            'uinteger8': 'UInt8' ,
            'integer16': 'Int16' ,
            'uinteger16': 'UInt16' ,
            'integer32': 'Int32' ,
            'uinteger32': 'UInt32' ,
            'integer64': 'Int64' ,
            'uinteger64': 'UInt64' ,
            'string': 'String' ,
            'boolean1': 'Boolean',
            'binary': 'Binary',
            'clock': 'Clock'
        }
    }

    ALL_TYPES = (
        "real64", "real32",
        "integer8", "uinteger8", "integer16", "uinteger16", "integer32", "uinteger32", "integer64", "uinteger64",
        "boolean", "boolean1",
        "string",
        "binary", "clock"
    )

    def __init__(self, fmi_type, attrs: Union[FMUPort, Dict[str, str]], fmi_version=0):
        self.causality = attrs.get("causality", "local")
        self.variability = attrs.get("variability", None)
        self.interval_variability = attrs.get("intervalVariability", None)
        self.name = attrs["name"]
        self.vr = int(attrs["valueReference"])
        self.description = attrs.get("description", None)

        if fmi_version > 0:
            self.type_name = self.FMI_TO_CONTAINER[fmi_version][fmi_type]
        else:
            self.type_name = fmi_type

        self.start_value = attrs.get("start", None)
        self.initial = attrs.get("initial", None)
        self.clock = attrs.get("clocks", None)
        self.interval_variability = attrs.get("intervalVariability", None)


    def xml(self, vr: int, name=None, causality=None, start=None, fmi_version=2) -> str:
        """Generate the XML element for this port in `modelDescription.xml`.

        Produces a `<ScalarVariable>` element (FMI 2.0) or a typed element
        like `<Float64>` (FMI 3.0).

        Args:
            vr (int): Value reference to use in the generated XML.
            name (str | None): Override port name. Defaults to `self.name`.
            causality (str | None): Override causality. Defaults to `self.causality`.
            start (str | None): Override start value. Defaults to `self.start_value`.
            fmi_version (int): FMI version (`2` or `3`).

        Returns:
            str: XML fragment string, or an empty string if the type is not
                compatible with the requested FMI version.
        """
        if name is None:
            name = self.name
        if causality is None:
            causality = self.causality
        if start is None:
            start = self.start_value
            if start is None and self.type_name == "binary" and self.initial == "exact":
                start = ""
        if self.variability is None:
            self.variability = "continuous" if "real" in self.type_name else "discrete"

        try:
            fmi_type = self.CONTAINER_TO_FMI[fmi_version][self.type_name]
        except KeyError:
            logger.error(f"Cannot expose ({causality}) '{name}' because type '{self.type_name}' is not compatible "
                         f"with FMI-{fmi_version}.0")
            return ""

        if fmi_version == 2:
            child_attrs =  {
                "start": start,
            }

            filtered_child_attrs = {key: value for key, value in child_attrs.items() if value is not None}
            child_str = (f"<{fmi_type} " +
                         " ".join([f'{key}="{value}"' for (key, value) in filtered_child_attrs.items()]) +
                         "/>")

            scalar_attrs = {
                "name": name,
                "valueReference": vr,
                "causality": causality,
                "variability": self.variability,
                "initial": self.initial,
                "description": self.description,
            }
            filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
            scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])
            return f'<ScalarVariable {scalar_attrs_str}>{child_str}</ScalarVariable>'

        elif fmi_version == 3:
            if fmi_type in ('String', 'Binary'):
                if start is not None:
                    child_str = f'<Start value="{start}"/>'
                else:
                    child_str = ''
                scalar_attrs = {
                    "name": name,
                    "valueReference": vr,
                    "causality": causality,
                    "variability": self.variability,
                    "initial": self.initial,
                    "description": self.description,
                }
                filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
                scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])
                return f'<{fmi_type} {scalar_attrs_str}>{child_str}</{fmi_type}>'
            else:
                scalar_attrs = {
                    "name": name,
                    "valueReference": vr,
                    "causality": causality,
                    "variability": self.variability,
                    "initial": self.initial,
                    "description": self.description,
                    "start": start,
                    "intervalVariability": self.interval_variability
                }
                filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
                scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])

                return f'<{fmi_type} {scalar_attrs_str}/>'
        else:
            logger.critical(f"Unknown version {fmi_version}. BUG?")
            return ''

xml(vr, name=None, causality=None, start=None, fmi_version=2)

Generate the XML element for this port in modelDescription.xml.

Produces a <ScalarVariable> element (FMI 2.0) or a typed element like <Float64> (FMI 3.0).

Parameters:

Name Type Description Default
vr int

Value reference to use in the generated XML.

required
name str | None

Override port name. Defaults to self.name.

None
causality str | None

Override causality. Defaults to self.causality.

None
start str | None

Override start value. Defaults to self.start_value.

None
fmi_version int

FMI version (2 or 3).

2

Returns:

Name Type Description
str str

XML fragment string, or an empty string if the type is not compatible with the requested FMI version.

Source code in fmu_manipulation_toolbox/container.py
def xml(self, vr: int, name=None, causality=None, start=None, fmi_version=2) -> str:
    """Generate the XML element for this port in `modelDescription.xml`.

    Produces a `<ScalarVariable>` element (FMI 2.0) or a typed element
    like `<Float64>` (FMI 3.0).

    Args:
        vr (int): Value reference to use in the generated XML.
        name (str | None): Override port name. Defaults to `self.name`.
        causality (str | None): Override causality. Defaults to `self.causality`.
        start (str | None): Override start value. Defaults to `self.start_value`.
        fmi_version (int): FMI version (`2` or `3`).

    Returns:
        str: XML fragment string, or an empty string if the type is not
            compatible with the requested FMI version.
    """
    if name is None:
        name = self.name
    if causality is None:
        causality = self.causality
    if start is None:
        start = self.start_value
        if start is None and self.type_name == "binary" and self.initial == "exact":
            start = ""
    if self.variability is None:
        self.variability = "continuous" if "real" in self.type_name else "discrete"

    try:
        fmi_type = self.CONTAINER_TO_FMI[fmi_version][self.type_name]
    except KeyError:
        logger.error(f"Cannot expose ({causality}) '{name}' because type '{self.type_name}' is not compatible "
                     f"with FMI-{fmi_version}.0")
        return ""

    if fmi_version == 2:
        child_attrs =  {
            "start": start,
        }

        filtered_child_attrs = {key: value for key, value in child_attrs.items() if value is not None}
        child_str = (f"<{fmi_type} " +
                     " ".join([f'{key}="{value}"' for (key, value) in filtered_child_attrs.items()]) +
                     "/>")

        scalar_attrs = {
            "name": name,
            "valueReference": vr,
            "causality": causality,
            "variability": self.variability,
            "initial": self.initial,
            "description": self.description,
        }
        filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
        scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])
        return f'<ScalarVariable {scalar_attrs_str}>{child_str}</ScalarVariable>'

    elif fmi_version == 3:
        if fmi_type in ('String', 'Binary'):
            if start is not None:
                child_str = f'<Start value="{start}"/>'
            else:
                child_str = ''
            scalar_attrs = {
                "name": name,
                "valueReference": vr,
                "causality": causality,
                "variability": self.variability,
                "initial": self.initial,
                "description": self.description,
            }
            filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
            scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])
            return f'<{fmi_type} {scalar_attrs_str}>{child_str}</{fmi_type}>'
        else:
            scalar_attrs = {
                "name": name,
                "valueReference": vr,
                "causality": causality,
                "variability": self.variability,
                "initial": self.initial,
                "description": self.description,
                "start": start,
                "intervalVariability": self.interval_variability
            }
            filtered_attrs = {key: value for key, value in scalar_attrs.items() if value is not None}
            scalar_attrs_str = " ".join([f'{key}="{value}"' for (key, value) in filtered_attrs.items()])

            return f'<{fmi_type} {scalar_attrs_str}/>'
    else:
        logger.critical(f"Unknown version {fmi_version}. BUG?")
        return ''

FMUContainer

Builds an FMU Container that embeds multiple FMUs into a single FMU.

An FMUContainer acts as both an FMI co-simulation FMU and an FMI importer. It loads embedded FMUs, wires their ports together, and generates the modelDescription.xml, the runtime configuration (container.txt), and the final .fmu archive.

Examples:

from pathlib import Path
from fmu_manipulation_toolbox.container import FMUContainer

container = FMUContainer("bouncing", Path("fmus"), fmi_version=2)
container.get_fmu("bb_position.fmu")
container.get_fmu("bb_velocity.fmu")
container.add_link("bb_position.fmu", "is_ground",
                   "bb_velocity.fmu", "reset")
container.add_implicit_rule(auto_input=True, auto_output=True)
container.make_fmu("bouncing.fmu", step_size=0.1)

Attributes:

Name Type Description
fmu_directory Path

Directory containing the source FMUs.

identifier str

Model identifier for the container.

fmi_version int

FMI version of the container interface (2 or 3).

involved_fmu OrderedDict[str, EmbeddedFMU]

Embedded FMUs, keyed by filename, in insertion order.

inputs dict[str, ContainerInput]

Container input ports, keyed by exposed name.

outputs dict[str, ContainerPort]

Container output ports, keyed by exposed name.

links dict[ContainerPort, Link]

Internal links between embedded FMUs.

start_values dict[ContainerPort, str]

Start values for embedded FMU ports.

vr_table ValueReferenceTable

Value reference allocator.

Raises:

Type Description
FMUContainerError

If the FMU directory is invalid.

Source code in fmu_manipulation_toolbox/container.py
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
class FMUContainer:
    """Builds an FMU Container that embeds multiple FMUs into a single FMU.

    An `FMUContainer` acts as both an FMI co-simulation FMU and an FMI importer.
    It loads embedded FMUs, wires their ports together, and generates the
    `modelDescription.xml`, the runtime configuration (`container.txt`), and the
    final `.fmu` archive.

    Examples:
        ```python
        from pathlib import Path
        from fmu_manipulation_toolbox.container import FMUContainer

        container = FMUContainer("bouncing", Path("fmus"), fmi_version=2)
        container.get_fmu("bb_position.fmu")
        container.get_fmu("bb_velocity.fmu")
        container.add_link("bb_position.fmu", "is_ground",
                           "bb_velocity.fmu", "reset")
        container.add_implicit_rule(auto_input=True, auto_output=True)
        container.make_fmu("bouncing.fmu", step_size=0.1)
        ```

    Attributes:
        fmu_directory (Path): Directory containing the source FMUs.
        identifier (str): Model identifier for the container.
        fmi_version (int): FMI version of the container interface (`2` or `3`).
        involved_fmu (OrderedDict[str, EmbeddedFMU]): Embedded FMUs, keyed
            by filename, in insertion order.
        inputs (dict[str, ContainerInput]): Container input ports, keyed by
            exposed name.
        outputs (dict[str, ContainerPort]): Container output ports, keyed by
            exposed name.
        links (dict[ContainerPort, Link]): Internal links between embedded FMUs.
        start_values (dict[ContainerPort, str]): Start values for embedded FMU ports.
        vr_table (ValueReferenceTable): Value reference allocator.

    Raises:
        FMUContainerError: If the FMU directory is invalid.
    """

    HEADER_XML_2 = """<?xml version="1.0" encoding="ISO-8859-1"?>
<fmiModelDescription
  fmiVersion="2.0"
  modelName="{identifier}"
  generationTool="FMUContainer-{tool_version}"
  generationDateAndTime="{timestamp}"
  guid="{guid}"
  description="FMUContainer with {embedded_fmu}"
  author="{author}"
  license="Proprietary"
  copyright="See Embedded FMU's copyrights."
  variableNamingConvention="structured">

  <CoSimulation
    modelIdentifier="{identifier}"
    canHandleVariableCommunicationStepSize="true"
    canBeInstantiatedOnlyOncePerProcess="{only_once}"
    canNotUseMemoryManagementFunctions="true"
    canGetAndSetFMUstate="false"
    canSerializeFMUstate="false"
    providesDirectionalDerivative="false"
    needsExecutionTool="{execution_tool}">
  </CoSimulation>

  <LogCategories>
    <Category name="Info"
              description="Info log messages." />
    <Category name="Error"
              description="Error log messages." />
  </LogCategories>

  <DefaultExperiment stepSize="{step_size}"{default_experiment_times}/>

  <ModelVariables>
    <ScalarVariable valueReference="0" name="time" causality="independent"><Real /></ScalarVariable>
"""

    HEADER_XML_3 = """<?xml version="1.0" encoding="ISO-8859-1"?>
<fmiModelDescription
  fmiVersion="3.0"
  modelName="{identifier}"
  generationTool="FMUContainer-{tool_version}"
  generationDateAndTime="{timestamp}"
  instantiationToken="{guid}"
  description="FMUContainer with {embedded_fmu}"
  author="{author}"
  license="Proprietary"
  copyright="See Embedded FMU's copyrights."
  variableNamingConvention="structured">

  <CoSimulation
    modelIdentifier="{identifier}"
    canHandleVariableCommunicationStepSize="true"
    canBeInstantiatedOnlyOncePerProcess="{only_once}"
    canNotUseMemoryManagementFunctions="true"
    canGetAndSetFMUState="false"
    canSerializeFMUState="false"
    providesDirectionalDerivatives="false"
    providesAdjointDerivatives="false"
    providesPerElementDependencies="false"
    providesEvaluateDiscreteStates="false"
    hasEventMode="true"
    needsExecutionTool="{execution_tool}">
  </CoSimulation>

  <LogCategories>
    <Category name="Info"
              description="Info log messages." />
    <Category name="Error"
              description="Error log messages." />
  </LogCategories>

  <DefaultExperiment stepSize="{step_size}"{default_experiment_times}/>

  <ModelVariables>
    <Float64 valueReference="0" name="time" causality="independent"/>
"""

    def __init__(self, identifier: str, fmu_directory: Union[str, Path], description_pathname=None, fmi_version=2):
        self.fmu_directory = Path(fmu_directory)
        self.identifier = identifier
        if not self.fmu_directory.is_dir():
            raise FMUContainerError(f"{self.fmu_directory} is not a valid directory")
        self.involved_fmu: OrderedDict[str, EmbeddedFMU] = OrderedDict()

        self.description_pathname = description_pathname
        self.fmi_version = fmi_version

        self.start_time = None
        self.stop_time = None

        # Rules
        self.inputs: Dict[str, ContainerInput] = {}
        self.outputs: Dict[str, ContainerPort] = {}
        self.links: Dict[ContainerPort, Link] = {}

        self.rules: Dict[ContainerPort, str] = {}
        self.start_values: Dict[ContainerPort, str] = {}

        self.vr_table = ValueReferenceTable()

    def get_fmu(self, fmu_filename: str) -> EmbeddedFMU:
        """Load an embedded FMU from the FMU directory.

        If the FMU has already been loaded, returns the cached instance.

        Args:
            fmu_filename (str): Filename of the FMU (e.g. `"model.fmu"`).

        Returns:
            EmbeddedFMU: The loaded and analysed FMU.

        Raises:
            FMUContainerError: If the FMU cannot be loaded.
        """
        if fmu_filename in self.involved_fmu:
            return self.involved_fmu[fmu_filename]

        try:
            fmu = EmbeddedFMU(self.fmu_directory / fmu_filename)
            if not fmu.fmi_version == self.fmi_version:
                logger.warning(f"Try to embed FMU-{fmu.fmi_version} into container FMI-{self.fmi_version}.")
            self.involved_fmu[fmu.name] = fmu

            logger.info(f"Involved FMU #{len(self.involved_fmu)}: {fmu}")
        except (FMUContainerError, FMUError) as e:
            raise FMUContainerError(f"Cannot load '{fmu_filename}': {e}")

        return fmu

    def mark_ruled(self, cport: ContainerPort, rule: str):
        if cport in self.rules:
            previous_rule = self.rules[cport]
            if rule not in ("OUTPUT", "LINK") and previous_rule not in ("OUTPUT", "LINK"):
                raise FMUContainerError(f"try to {rule} port {cport} which is already {previous_rule}")

        self.rules[cport] = rule

    def get_all_cports(self):
        return [ContainerPort(fmu, port_name) for fmu in self.involved_fmu.values() for port_name in fmu.ports]

    def add_input(self, container_port_name: str, to_fmu_filename: str, to_port_name: str):
        """Expose a port of an embedded FMU as a container input.

        Multiple embedded FMU ports can be connected to the same container
        input (fan-out), provided they share the same type and causality.

        Args:
            container_port_name (str): Exposed name on the container. If empty,
                defaults to `to_port_name`.
            to_fmu_filename (str): Filename of the embedded FMU.
            to_port_name (str): Name of the input port on the embedded FMU.

        Raises:
            FMUContainerError: If the port causality is not `"input"` or
                `"parameter"`, or if types do not match an existing input
                with the same name.
        """
        if not container_port_name:
            container_port_name = to_port_name
        cport_to = ContainerPort(self.get_fmu(to_fmu_filename), to_port_name)
        if cport_to.port.causality not in ("input", "parameter"):  # check causality
            raise FMUContainerError(f"Tried to use '{cport_to}' as INPUT of the container but FMU causality is "
                                    f"'{cport_to.port.causality}'.")

        try:
            input_port = self.inputs[container_port_name]
            input_port.add_cport(cport_to)
        except KeyError:
            self.inputs[container_port_name] = ContainerInput(container_port_name, cport_to)

        logger.debug(f"INPUT: {to_fmu_filename}:{to_port_name}")
        self.mark_ruled(cport_to, 'INPUT')

    def add_output(self, from_fmu_filename: str, from_port_name: str, container_port_name: str):
        """Expose a port of an embedded FMU as a container output.

        Args:
            from_fmu_filename (str): Filename of the embedded FMU.
            from_port_name (str): Name of the output port on the embedded FMU.
            container_port_name (str): Exposed name on the container. If empty,
                defaults to `from_port_name`.

        Raises:
            FMUContainerError: If the port causality is not `"output"` or
                `"local"`, or if the exposed name is already used.
        """
        if not container_port_name:  # empty is allowed
            container_port_name = from_port_name

        cport_from = ContainerPort(self.get_fmu(from_fmu_filename), from_port_name)
        if cport_from.port.causality not in ("output", "local"):  # check causality
            raise FMUContainerError(f"Tried to use '{cport_from}' as OUTPUT of the container but FMU causality is "
                                    f"'{cport_from.port.causality}'.")

        if container_port_name in self.outputs:
            raise FMUContainerError(f"Duplicate OUTPUT {container_port_name} already connected to {cport_from}")

        logger.debug(f"OUTPUT: {from_fmu_filename}:{from_port_name}")
        self.mark_ruled(cport_from, 'OUTPUT')
        self.outputs[container_port_name] = cport_from

    def drop_port(self, from_fmu_filename: str, from_port_name: str):
        """Explicitly ignore an output port of an embedded FMU.

        Prevents the port from being auto-exposed or flagged as unconnected.

        Args:
            from_fmu_filename (str): Filename of the embedded FMU.
            from_port_name (str): Name of the output port to drop.

        Raises:
            FMUContainerError: If the port causality is not `"output"`.
        """
        cport_from = ContainerPort(self.get_fmu(from_fmu_filename), from_port_name)
        if not cport_from.port.causality == "output":  # check causality
            raise FMUContainerError(f"{cport_from}: trying to DROP {cport_from.port.causality}")

        logger.debug(f"DROP: {from_fmu_filename}:{from_port_name}")
        self.mark_ruled(cport_from, 'DROP')

    def add_link(self, from_fmu_filename: str, from_port_name: str, to_fmu_filename: str, to_port_name: str):
        """Connect an output of one embedded FMU to an input of another.

        If both port names match FMI Terminal definitions, a terminal-level
        connection is made (connecting all member ports). Otherwise, a regular
        port-to-port link is created.

        Args:
            from_fmu_filename (str): Filename of the source FMU.
            from_port_name (str): Output port name (or terminal name).
            to_fmu_filename (str): Filename of the destination FMU.
            to_port_name (str): Input port name (or terminal name).

        Raises:
            FMUContainerError: If port causalities are invalid or types
                are incompatible.
        """
        fmu_from = self.get_fmu(from_fmu_filename)
        fmu_to = self.get_fmu(to_fmu_filename)

        if from_port_name in fmu_from.terminals and to_port_name in fmu_to.terminals:
            # TERMINAL Connection
            terminal1 = fmu_from.terminals[from_port_name]
            terminal2 = fmu_to.terminals[to_port_name]
            if terminal1 == terminal2:
                logger.debug(f"Plugging terminals: {terminal1} <-> {terminal2}")
                for terminal1_port_name, terminal2_port_name in terminal1.connect(terminal2):
                    self.add_link_regular(fmu_from, terminal1_port_name, fmu_to, terminal2_port_name)
            else:
                logger.error(f"Cannot plug incompatible terminals: {terminal1} <-> {terminal2}")
        else:
            # REGULAR port connection
            self.add_link_regular(fmu_from, from_port_name, fmu_to, to_port_name)

    def add_link_regular(self, fmu_from: EmbeddedFMU, from_port_name: str, fmu_to: EmbeddedFMU, to_port_name: str):
            cport_from = ContainerPort(fmu_from, from_port_name)
            cport_to = ContainerPort(fmu_to, to_port_name)

            if cport_to.port.causality == "output" and cport_from.port.causality == "input":
                logger.debug("Invert link orientation")
                tmp = cport_to
                cport_to = cport_from
                cport_from = tmp

            try:
                local = self.links[cport_from]
            except KeyError:
                local = Link(cport_from)
                self.links[cport_from] = local

            local.add_target(cport_to)  # Causality is check in the add() function

            logger.debug(f"LINK: {cport_from} -> {cport_to}")
            self.mark_ruled(cport_from, 'LINK')
            self.mark_ruled(cport_to, 'LINK')


    def add_start_value(self, fmu_filename: str, port_name: str, value: str):
        """Set a start value for a port of an embedded FMU.

        The value is automatically converted to the appropriate type
        (float, int, bool, or string).

        Args:
            fmu_filename (str): Filename of the embedded FMU.
            port_name (str): Name of the port.
            value (str): Start value as a string.

        Raises:
            FMUContainerError: If the value cannot be converted to the
                port's type.
        """
        cport = ContainerPort(self.get_fmu(fmu_filename), port_name)

        try:
            if cport.port.type_name.startswith('real'):
                value = float(value)
            elif cport.port.type_name.startswith('integer') or  cport.port.type_name.startswith('uinteger'):
                value = int(value)
            elif cport.port.type_name.startswith('boolean'):
                value = int(bool(value))
            elif cport.port.type_name == 'String':
                value = value
            else:
                logger.error(f"Start value cannot be set on '{cport.port.type_name}'")
                return
        except ValueError:
            raise FMUContainerError(f"Start value is not conforming to {cport.port.type_name} format.")

        self.start_values[cport] = value

    def find_inputs(self, port_to_connect: EmbeddedFMUPort) -> List[ContainerPort]:
        candidates = []
        for cport in self.get_all_cports():
            if (cport.port.causality == 'input' and cport not in self.rules and cport.port.name == port_to_connect.name
                    and cport.port.type_name == port_to_connect.type_name):
                candidates.append(cport)
        return candidates

    def add_implicit_rule(self, auto_input=True, auto_output=True, auto_link=True, auto_parameter=False,
                          auto_local=False) -> AutoWired:
        """Automatically wire unconnected ports of embedded FMUs.

        Processes all ports in the following order:

        1. **auto_link**: Connect outputs to inputs with matching names and types.
        2. **auto_output**: Expose remaining unconnected outputs.
        3. **auto_local**: Expose local variables.
        4. **auto_input**: Expose remaining unconnected inputs.
        5. **auto_parameter**: Expose parameters.

        Args:
            auto_input (bool): Expose unconnected input ports.
            auto_output (bool): Expose unconnected output ports.
            auto_link (bool): Link matching output/input ports automatically.
            auto_parameter (bool): Expose parameter ports.
            auto_local (bool): Expose local variables.

        Returns:
            AutoWired: Record of all automatically created rules.
        """
        auto_wired = AutoWired()
        # Auto Link outputs
        for cport in self.get_all_cports():
            if cport.port.causality == 'output':
                candidates_cport_list = self.find_inputs(cport.port)
                if auto_link and candidates_cport_list:
                    for candidate_cport in candidates_cport_list:
                        logger.info(f"AUTO LINK: {cport} -> {candidate_cport}")
                        self.add_link(cport.fmu.name, cport.port.name,
                                      candidate_cport.fmu.name, candidate_cport.port.name)
                        auto_wired.add_link(cport.fmu.name, cport.port.name,
                                            candidate_cport.fmu.name, candidate_cport.port.name)
                elif auto_output and cport not in self.rules:
                    logger.info(f"AUTO OUTPUT: Expose {cport}")
                    self.add_output(cport.fmu.name, cport.port.name, cport.port.name)
                    auto_wired.add_output(cport.fmu.name, cport.port.name, cport.port.name)
            elif cport.port.causality == 'local':
                local_portname = None
                if cport.port.name.startswith("container."):
                    local_portname = "container." + cport.fmu.id + "." + cport.port.name[10:]
                    logger.info(f"PROFILING: Expose {cport}")
                elif auto_local:
                    local_portname = cport.fmu.id + "." + cport.port.name
                    logger.info(f"AUTO LOCAL: Expose {cport}")
                if local_portname:
                    self.add_output(cport.fmu.name, cport.port.name, local_portname)
                    auto_wired.add_output(cport.fmu.name, cport.port.name, local_portname)

        if auto_input:
            # Auto link inputs
            for cport in self.get_all_cports():
                if cport not in self.rules:
                    if cport.port.causality == 'parameter' and auto_parameter:
                        parameter_name = cport.fmu.id + "." + cport.port.name
                        logger.info(f"AUTO PARAMETER: {cport} as {parameter_name}")
                        self.add_input(parameter_name, cport.fmu.name, cport.port.name)
                        auto_wired.add_parameter(parameter_name, cport.fmu.name, cport.port.name)
                    elif cport.port.causality == 'input':
                        logger.info(f"AUTO INPUT: Expose {cport}")
                        self.add_input(cport.port.name, cport.fmu.name, cport.port.name)
                        auto_wired.add_input(cport.port.name, cport.fmu.name, cport.port.name)

        logger.info(f"Auto-wiring: {auto_wired}")

        return auto_wired

    def default_step_size(self) -> float:
        """Compute the default step size from embedded FMUs.

        Uses the GCD of the frequencies of FMUs that cannot handle variable
        step sizes. If all FMUs support variable steps, returns the largest
        step size.

        Returns:
            float: Computed step size in seconds.
        """
        freq_set = set()
        for fmu in self.involved_fmu.values():
            if fmu.step_size and fmu.capabilities["canHandleVariableCommunicationStepSize"] == "false":
                freq_set.add(int(1.0/fmu.step_size))

        if not freq_set:
            # all involved FMUs can Handle Variable Communication StepSize
            step_size_max = 0
            for fmu in self.involved_fmu.values():
                if fmu.step_size > step_size_max:
                    step_size_max = fmu.step_size
            return step_size_max

        common_freq = math.gcd(*freq_set)
        try:
            step_size = 1.0 / float(common_freq)
        except ZeroDivisionError:
            step_size = 0.1
            logger.warning(f"Defaulting to step_size={step_size}")

        return step_size

    def sanity_check(self, step_size: Optional[float]):
        """Validate the container configuration before building.

        Warns about step size mismatches and unconnected ports.

        Args:
            step_size (float | None): The container's internal step size.
        """
        for fmu in self.involved_fmu.values():
            if fmu.step_size and fmu.capabilities["canHandleVariableCommunicationStepSize"] == "false":
                ts_ratio = step_size / fmu.step_size
                logger.debug(f"container step_size: {step_size} = {fmu.step_size} x {ts_ratio} for {fmu.name}")
                if ts_ratio < 1.0:
                    logger.warning(f"Container step_size={step_size}s is lower than FMU '{fmu.name}' "
                                   f"step_size={fmu.step_size}s.")
                if ts_ratio != int(ts_ratio):
                    logger.warning(f"Container step_size={step_size}s should divisible by FMU '{fmu.name}' "
                                   f"step_size={fmu.step_size}s.")
            for port_name in fmu.ports:
                cport = ContainerPort(fmu, port_name)
                if cport not in self.rules:
                    if cport.port.causality == 'input':
                        logger.error(f"{cport} is not connected")
                    if cport.port.causality == 'output':
                        logger.warning(f"{cport} is not connected")

    def make_fmu(self, fmu_filename: Union[str, Path], step_size: Optional[float] = None, debug=False, mt=False,
                 profiling=False, sequential=False, ts_multiplier=False, datalog=False):
        """Build the FMU Container archive.

        Generates the `modelDescription.xml`, the `container.txt` runtime
        configuration, and packages everything into a `.fmu` zip archive.

        Args:
            fmu_filename (str | Path): Output filename for the container.
            step_size (float | None): Internal time step in seconds. If `None`,
                deduced from the embedded FMUs.
            debug (bool): Keep intermediate build artifacts.
            mt (bool): Enable multi-threaded mode.
            profiling (bool): Enable profiling mode.
            sequential (bool): Use sequential scheduling.
            ts_multiplier (bool): Add a `TS_MULTIPLIER` input port.
            datalog (bool): Generate a datalog configuration.
        """
        if isinstance(fmu_filename, str):
            fmu_filename = Path(fmu_filename)

        if step_size is None:
            logger.info(f"step_size  will be deduced from the embedded FMU's")
            step_size = self.default_step_size()
        self.sanity_check(step_size)

        logger.info(f"Building FMU '{fmu_filename}', step_size={step_size}")

        base_directory = self.fmu_directory / fmu_filename.with_suffix('')
        resources_directory = self.make_fmu_skeleton(base_directory)

        with open(base_directory / "modelDescription.xml", "wt") as xml_file:
            self.make_fmu_xml(xml_file, step_size, profiling, ts_multiplier)
        with open(resources_directory / "container.txt", "wt") as txt_file:
            self.make_fmu_txt(txt_file, step_size, mt, profiling, sequential)

        if datalog:
            with open(resources_directory / "datalog.txt", "wt") as datalog_file:
                self.make_datalog(datalog_file)

        self.make_fmu_package(base_directory, fmu_filename)
        if not debug:
            self.make_fmu_cleanup(base_directory)

    def make_fmu_xml(self, xml_file, step_size: float, profiling: bool, ts_multiplier: bool):
        timestamp = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
        guid = str(uuid.uuid4())
        embedded_fmu = ", ".join([fmu_name for fmu_name in self.involved_fmu])
        try:
            author = getpass.getuser()
        except OSError:
            author = "Unspecified"

        capabilities = {}
        for capability in EmbeddedFMU.capability_list:
            capabilities[capability] = "false"
            for fmu in self.involved_fmu.values():
                if fmu.capabilities[capability] == "true":
                    capabilities[capability] = "true"

        first_fmu = next(iter(self.involved_fmu.values()))
        if self.start_time is None:
            self.start_time = first_fmu.start_time
            logger.info(f"start_time={self.start_time} (deduced from '{first_fmu.name}')")
        else:
            logger.info(f"start_time={self.start_time}")

        if self.stop_time is None:
            self.stop_time = first_fmu.stop_time
            logger.info(f"stop_time={self.stop_time} (deduced from '{first_fmu.name}')")
        else:
            logger.info(f"stop_time={self.stop_time}")

        default_experiment_times = ""
        if self.start_time is not None:
            default_experiment_times += f' startTime="{self.start_time}"'
        if self.stop_time is not None:
            default_experiment_times += f' stopTime="{self.stop_time}"'

        if self.fmi_version == 2:
            xml_file.write(self.HEADER_XML_2.format(identifier=self.identifier, tool_version=tool_version,
                                                    timestamp=timestamp, guid=guid, embedded_fmu=embedded_fmu,
                                                    author=author,
                                                    only_once=capabilities['canBeInstantiatedOnlyOncePerProcess'],
                                                    execution_tool=capabilities['needsExecutionTool'],
                                                    default_experiment_times=default_experiment_times,
                                                    step_size=step_size))
        elif self.fmi_version == 3:
            xml_file.write(self.HEADER_XML_3.format(identifier=self.identifier, tool_version=tool_version,
                                                    timestamp=timestamp, guid=guid, embedded_fmu=embedded_fmu,
                                                    author=author,
                                                    only_once=capabilities['canBeInstantiatedOnlyOncePerProcess'],
                                                    execution_tool=capabilities['needsExecutionTool'],
                                                    default_experiment_times=default_experiment_times,
                                                    step_size=step_size))

        vr_time = self.vr_table.add_vr("real64", local=True)
        logger.debug(f"Time vr = {vr_time}")

        vr_ts_multiplier = self.vr_table.add_vr("integer32", local=True)
        if ts_multiplier:
            logger.debug(f"TS Multiplier vr = {vr_ts_multiplier}")
            port = EmbeddedFMUPort("integer32", {"valueReference": vr_ts_multiplier,
                                                 "name": f"container.ts_multiplier",
                                                 "causality": "input",
                                                 "description": f"Timestep multiplier",
                                                 "variability": "discrete",
                                                 "start": 1,
                                                 "initial": "exact"})
            print(f"    {port.xml(vr_ts_multiplier, fmi_version=self.fmi_version)}", file=xml_file)

        if profiling:
            for fmu in self.involved_fmu.values():
                vr = self.vr_table.add_vr("real64", local=True)
                port = EmbeddedFMUPort("real64", {"valueReference": vr,
                                        "name": f"container.{fmu.id}.rt_ratio",
                                        "description": f"RT ratio for embedded FMU '{fmu.name}'"})
                print(f"    {port.xml(vr, fmi_version=self.fmi_version)}", file=xml_file)

        index_offset = 2    # index of output ports. Start at 2 to skip "time" port

        # Local variable should be first to ensure to attribute them the lowest VR.
        nb_clocks = 0
        for link in self.links.values():
            self.vr_table.set_link_vr(link)
            if link.cport_from:
                port_local_def = link.cport_from.port.xml(link.vr, name=link.name, causality='local',
                                                          fmi_version=self.fmi_version)
            else:
                # LS-BUS allow Clock generated by fmi-importer
                port = EmbeddedFMUPort("Clock",
                                       {"name": "", "valueReference": -1, "intervalVariability": "triggered"},
                                       fmi_version=3)
                port_local_def = port.xml(link.vr, name=f"container.clock{nb_clocks}", causality='local', fmi_version=self.fmi_version)
                nb_clocks += 1

            if port_local_def:
                print(f"    {port_local_def}", file=xml_file)
                index_offset += 1

        for input_port_name, input_port in self.inputs.items():
            input_port.vr = self.vr_table.add_vr(input_port.type_name)
            # Get Start and XML from first connected input
            start = self.start_values.get(input_port.cport_list[0], None)
            port_input_def = input_port.cport_list[0].port.xml(input_port.vr, name=input_port_name,
                                                               start=start, fmi_version=self.fmi_version)
            if port_input_def:
                print(f"    {port_input_def}", file=xml_file)
                index_offset += 1

        for output_port_name, output_port in self.outputs.items():
            output_port.vr = self.vr_table.add_vr(output_port)
            port_output_def = output_port.port.xml(output_port.vr, name=output_port_name,
                                                   fmi_version=self.fmi_version)
            if port_output_def:
                print(f"    {port_output_def}", file=xml_file)

        if self.fmi_version == 2:
            self.make_fmu_xml_epilog_2(xml_file, index_offset)
        elif self.fmi_version == 3:
            self.make_fmu_xml_epilog_3(xml_file)

    def make_fmu_xml_epilog_2(self, xml_file, index_offset):
        xml_file.write("  </ModelVariables>\n"
                       "\n"
                       "  <ModelStructure>\n")


        if self.outputs:
            xml_file.write("    <Outputs>\n")
            index = index_offset
            for output in self.outputs.values():
                if output.port.type_name in EmbeddedFMUPort.CONTAINER_TO_FMI[2]:
                    print(f'      <Unknown index="{index}"/>', file=xml_file)
                    index += 1
            xml_file.write("    </Outputs>\n"
                           "    <InitialUnknowns>\n")
            index = index_offset
            for output in self.outputs.values():
                if output.port.type_name in EmbeddedFMUPort.CONTAINER_TO_FMI[2]:
                    print(f'      <Unknown index="{index}"/>', file=xml_file)
                    index += 1
            xml_file.write("    </InitialUnknowns>\n")

        xml_file.write("  </ModelStructure>\n"
                       "\n"
                       "</fmiModelDescription>")

    def make_fmu_xml_epilog_3(self, xml_file):
        xml_file.write("  </ModelVariables>\n"
                       "\n"
                       "  <ModelStructure>\n")
        for output in self.outputs.values():
            if output.port.type_name in EmbeddedFMUPort.CONTAINER_TO_FMI[3]:
                print(f'      <Output valueReference="{output.vr}"/>', file=xml_file)
        for output in self.outputs.values():
            if output.port.type_name in EmbeddedFMUPort.CONTAINER_TO_FMI[3]:
                print(f'      <InitialUnknown valueReference="{output.vr}"/>', file=xml_file)
        xml_file.write("  </ModelStructure>\n"
                       "\n"
                       "</fmiModelDescription>")

    def make_fmu_txt(self, txt_file, step_size: float, mt: bool, profiling: bool, sequential: bool):
        print("# Version 3", file=txt_file)
        print("# Container flags <MT> <Profiling> <Sequential>", file=txt_file)
        flags = [ str(int(flag == True)) for flag in (mt, profiling, sequential)]
        print(" ".join(flags), file=txt_file)

        print(f"# Internal time step in seconds", file=txt_file)
        print(f"{step_size}", file=txt_file)
        print(f"# NB of embedded FMU's", file=txt_file)
        print(f"{len(self.involved_fmu)}", file=txt_file)
        fmu_rank: Dict[str, int] = {}
        for i, fmu in enumerate(self.involved_fmu.values()):
            print(f"{fmu.name} {fmu.fmi_version} {int(fmu.has_event_mode)}", file=txt_file)
            print(f"{fmu.model_identifier}", file=txt_file)
            print(f"{fmu.guid}", file=txt_file)
            fmu_rank[fmu.name] = i

        # Prepare data structure
        inputs_per_type: Dict[str, List[ContainerInput]] = defaultdict(list) # Container's INPUT
        outputs_per_type: Dict[str, List[ContainerPort]] = defaultdict(list) # Container's OUTPUT

        fmu_io_list = FMUIOList(self.vr_table)
        clock_list = ClockList(self.involved_fmu)

        local_per_type: Dict[str, List[int]] = defaultdict(list)
        links_per_fmu: Dict[str, List[Link]] = defaultdict(list)

        # Fill data structure
        # Inputs
        for input_port_name, input_port in self.inputs.items():
            inputs_per_type[input_port.type_name].append(input_port)

        # Start values
        for input_port, value in self.start_values.items():
            fmu_io_list.add_start_value(input_port, value)

        # Outputs
        for output_port_name, output_port in self.outputs.items():
            outputs_per_type[output_port.port.type_name].append(output_port)

        # Links
        for link in self.links.values():
            # FMU Outputs
            if link.cport_from:
                local_per_type[link.cport_from.port.type_name].append(link.vr)
                fmu_io_list.add_output(link.cport_from, link.vr)
            else:
                local_per_type["clock"].append(link.vr)
                for cport_to in link.cport_to_list:
                    if cport_to.port.interval_variability == "countdown":
                        logger.info(f"LS-BUS: importer scheduling for '{cport_to.fmu.name}' '{cport_to.port.name}' (clock={cport_to.port.vr}, vr={link.vr})")
                        clock_list.append(cport_to, link.vr)
                        break

            # FMU Inputs
            for cport_to in link.cport_to_list:
                if link.cport_from is not None or not cport_to.fmu.ls.is_bus:
                    # LS-BUS allows, importer to feed clock signal. In this case, cport_from is None
                    # FMU will be fed directly by importer, no need to add inpunt link!
                    if link.cport_from is None or cport_to.port.type_name == link.cport_from.port.type_name:
                        local_vr = link.vr
                    else:
                        local_per_type[cport_to.port.type_name].append(link.vr_converted[cport_to.port.type_name])
                        links_per_fmu[link.cport_from.fmu.name].append(link)
                        local_vr = link.vr_converted[cport_to.port.type_name]

                    fmu_io_list.add_input(cport_to, local_vr)

        print(f"# NB local variables:", ", ".join(EmbeddedFMUPort.ALL_TYPES), file=txt_file)
        nb_local = [f"{self.vr_table.nb_local(type_name)}" for type_name in EmbeddedFMUPort.ALL_TYPES]
        print(" ".join(nb_local), file=txt_file, end='')
        print("", file=txt_file)

        print("# CONTAINER I/O: <VR> <NB> <FMU_INDEX> <FMU_VR> [<FMU_INDEX> <FMU_VR>]", file=txt_file)
        for type_name in EmbeddedFMUPort.ALL_TYPES:
            print(f"# {type_name}" , file=txt_file)
            nb_local = (len(inputs_per_type[type_name]) +
                        len(outputs_per_type[type_name]) +
                        self.vr_table.nb_local(type_name))
            nb_input_link = 0
            for input_port in inputs_per_type[type_name]:
                nb_input_link += len(input_port.cport_list) - 1
            print(f"{nb_local} {nb_local + nb_input_link}", file=txt_file)
            if type_name == "real64":
                print(f"0 1 -1 0", file=txt_file)  # Time slot
                if profiling:
                    for profiling_port, _ in enumerate(self.involved_fmu.values()):
                        print(f"{profiling_port + 1} 1 -2 {profiling_port + 1}", file=txt_file)
            elif type_name == "integer32":
                print(f"0 1 -1 0", file=txt_file)  # TS Multiplier

            for input_port in inputs_per_type[type_name]:
                cport_string = [f"{fmu_rank[cport.fmu.name]} {cport.port.vr}" for cport in input_port.cport_list]
                print(f"{input_port.vr} {len(input_port.cport_list)}", " ".join(cport_string), file=txt_file)
            for output_port in outputs_per_type[type_name]:
                print(f"{output_port.vr} 1 {fmu_rank[output_port.fmu.name]} {output_port.port.vr}", file=txt_file)
            for local_vr in local_per_type[type_name]:
                print(f"{local_vr} 1 -1 {local_vr & 0xFFFFFF}", file=txt_file)

        # LINKS
        for fmu in self.involved_fmu.values():
            fmu_io_list.write_txt(fmu.name, txt_file)

            print(f"# Conversion table of {fmu.name}: <VR_FROM> <VR_TO> <CONVERSION>", file=txt_file)
            try:
                nb = 0
                for link in links_per_fmu[fmu.name]:
                    nb += len(link.vr_converted)
                print(f"{nb}", file=txt_file)
                for link in links_per_fmu[fmu.name]:
                    for cport_to in link.cport_to_list:
                        conversion =  link.get_conversion(cport_to)
                        if conversion:
                            print(f"{link.vr} {link.vr_converted[cport_to.port.type_name]} {conversion}",
                                  file=txt_file)
            except KeyError:
                print("0", file=txt_file)

        # CLOCKS
        clock_list.write_txt(txt_file)

    def make_datalog(self, datalog_file):
        print(f"# Datalog filename", file=datalog_file)
        print(f"{self.identifier}-datalog.csv", file=datalog_file)

        ports = defaultdict(list)
        for input_port_name, input_port in self.inputs.items():
            ports[input_port.type_name].append((input_port.vr, input_port_name))
        for output_port_name, output_port in self.outputs.items():
            ports[output_port.port.type_name].append((output_port.vr, output_port_name))
        for link in self.links.values():
            if link.cport_from is None:
                # LS-BUS allows to connected to input clocks.
                ports[link.cport_to_list[0].port.type_name].append((link.vr, link.name))
            else:
                ports[link.cport_from.port.type_name].append((link.vr, link.name))

        for type_name in EmbeddedFMUPort.ALL_TYPES:
            print(f"# {type_name}: <VR> <NAME>" , file=datalog_file)
            print(f"{len(ports[type_name])}", file=datalog_file)
            for port in ports[type_name]:
                print(f"{port[0]} {port[1]}", file=datalog_file)

    @staticmethod
    def long_path(path: Union[str, Path]) -> str:
        # https://stackoverflow.com/questions/14075465/copy-a-file-with-a-too-long-path-to-another-directory-in-python
        if os.name == 'nt':
            return "\\\\?\\" + os.path.abspath(str(path))
        else:
            return path

    @staticmethod
    def copyfile(origin, destination):
        logger.debug(f"Copying {origin} in {destination}")
        shutil.copy(origin, destination)

    def get_bindir_and_suffixe(self) -> Generator[Tuple[str, str, str], Any, None]:
        fmu_iter = iter(self.involved_fmu.values())
        try:
            fmu = next(fmu_iter)
        except StopIteration:
            raise FMUContainerError("No fmu declared in this container.")

        os_list = fmu.platforms
        logger.debug(f"FMU '{fmu.name}' OS support: {', '.join(fmu.platforms)}.")

        for fmu in fmu_iter:
            logger.debug(f"FMU '{fmu.name}' OS support: {', '.join(fmu.platforms)}.")
            os_list &= fmu.platforms

        suffixes = {
            "Windows": "dll",
            "Linux": "so",
            "Darwin": "dylib"
        }

        origin_bindirs = {
            "Windows": "win64",
            "Linux": "linux64",
            "Darwin": "darwin64"
        }

        if self.fmi_version == 3:
            target_bindirs = {
                "Windows": "x86_64-windows",
                "Linux": "x86_64-linux",
                "Darwin": "aarch64-darwin"
            }
        else:
            target_bindirs = origin_bindirs

        if os_list:
            logger.info(f"Container will be built for {', '.join(os_list)}.")
        else:
            logger.critical("No common OS found for embedded FMU. Try to re-run with '-debug'. Container won't be runnable.")

        for os_name in os_list:
            try:
                yield origin_bindirs[os_name], suffixes[os_name], target_bindirs[os_name]
            except KeyError:
                raise FMUContainerError(f"OS '{os_name}' is not supported.")

    def make_fmu_skeleton(self, base_directory: Path) -> Path:
        logger.debug(f"Initialize directory '{base_directory}'")

        origin = Path(__file__).parent / "resources"
        resources_directory = base_directory / "resources"
        documentation_directory = base_directory / "documentation"
        binaries_directory = base_directory / "binaries"

        base_directory.mkdir(exist_ok=True)
        resources_directory.mkdir(exist_ok=True)
        binaries_directory.mkdir(exist_ok=True)
        documentation_directory.mkdir(exist_ok=True)

        if self.description_pathname:
            self.copyfile(self.description_pathname, documentation_directory)

        self.copyfile(origin / "model.png", base_directory)

        for origin_bindir, suffixe, target_bindir in self.get_bindir_and_suffixe():
            library_filename = origin / origin_bindir / f"container.{suffixe}"
            if library_filename.is_file():
                binary_directory = binaries_directory / target_bindir
                binary_directory.mkdir(exist_ok=True)
                self.copyfile(library_filename, binary_directory / f"{self.identifier}.{suffixe}")
            else:
                logger.critical(f"File {library_filename} not found.")

        for i, fmu in enumerate(self.involved_fmu.values()):
            with zipfile.ZipFile(fmu.fmu.fmu_filename) as zin:
                zin.extractall(self.long_path(resources_directory / f"{i:02x}"))

        return resources_directory

    def make_fmu_package(self, base_directory: Path, fmu_filename: Path):
        logger.debug(f"Zipping directory '{base_directory}' => '{fmu_filename}'")
        zip_directory = self.long_path(str(base_directory.absolute()))
        offset = len(zip_directory) + 1
        with zipfile.ZipFile(self.fmu_directory / fmu_filename, "w", zipfile.ZIP_DEFLATED) as zip_file:
            def add_file(directory: Path):
                for entry in directory.iterdir():
                    if entry.is_dir():
                        add_file(directory / entry)
                    elif entry.is_file:
                        zip_file.write(str(entry), str(entry)[offset:])

            add_file(Path(zip_directory))
        logger.info(f"'{fmu_filename}' is available.")

    def make_fmu_cleanup(self, base_directory: Path):
        logger.debug(f"Delete directory '{base_directory}'")
        shutil.rmtree(self.long_path(base_directory))

add_implicit_rule(auto_input=True, auto_output=True, auto_link=True, auto_parameter=False, auto_local=False)

Automatically wire unconnected ports of embedded FMUs.

Processes all ports in the following order:

  1. auto_link: Connect outputs to inputs with matching names and types.
  2. auto_output: Expose remaining unconnected outputs.
  3. auto_local: Expose local variables.
  4. auto_input: Expose remaining unconnected inputs.
  5. auto_parameter: Expose parameters.

Parameters:

Name Type Description Default
auto_input bool

Expose unconnected input ports.

True
auto_output bool

Expose unconnected output ports.

True
auto_link bool

Link matching output/input ports automatically.

True
auto_parameter bool

Expose parameter ports.

False
auto_local bool

Expose local variables.

False

Returns:

Name Type Description
AutoWired AutoWired

Record of all automatically created rules.

Source code in fmu_manipulation_toolbox/container.py
def add_implicit_rule(self, auto_input=True, auto_output=True, auto_link=True, auto_parameter=False,
                      auto_local=False) -> AutoWired:
    """Automatically wire unconnected ports of embedded FMUs.

    Processes all ports in the following order:

    1. **auto_link**: Connect outputs to inputs with matching names and types.
    2. **auto_output**: Expose remaining unconnected outputs.
    3. **auto_local**: Expose local variables.
    4. **auto_input**: Expose remaining unconnected inputs.
    5. **auto_parameter**: Expose parameters.

    Args:
        auto_input (bool): Expose unconnected input ports.
        auto_output (bool): Expose unconnected output ports.
        auto_link (bool): Link matching output/input ports automatically.
        auto_parameter (bool): Expose parameter ports.
        auto_local (bool): Expose local variables.

    Returns:
        AutoWired: Record of all automatically created rules.
    """
    auto_wired = AutoWired()
    # Auto Link outputs
    for cport in self.get_all_cports():
        if cport.port.causality == 'output':
            candidates_cport_list = self.find_inputs(cport.port)
            if auto_link and candidates_cport_list:
                for candidate_cport in candidates_cport_list:
                    logger.info(f"AUTO LINK: {cport} -> {candidate_cport}")
                    self.add_link(cport.fmu.name, cport.port.name,
                                  candidate_cport.fmu.name, candidate_cport.port.name)
                    auto_wired.add_link(cport.fmu.name, cport.port.name,
                                        candidate_cport.fmu.name, candidate_cport.port.name)
            elif auto_output and cport not in self.rules:
                logger.info(f"AUTO OUTPUT: Expose {cport}")
                self.add_output(cport.fmu.name, cport.port.name, cport.port.name)
                auto_wired.add_output(cport.fmu.name, cport.port.name, cport.port.name)
        elif cport.port.causality == 'local':
            local_portname = None
            if cport.port.name.startswith("container."):
                local_portname = "container." + cport.fmu.id + "." + cport.port.name[10:]
                logger.info(f"PROFILING: Expose {cport}")
            elif auto_local:
                local_portname = cport.fmu.id + "." + cport.port.name
                logger.info(f"AUTO LOCAL: Expose {cport}")
            if local_portname:
                self.add_output(cport.fmu.name, cport.port.name, local_portname)
                auto_wired.add_output(cport.fmu.name, cport.port.name, local_portname)

    if auto_input:
        # Auto link inputs
        for cport in self.get_all_cports():
            if cport not in self.rules:
                if cport.port.causality == 'parameter' and auto_parameter:
                    parameter_name = cport.fmu.id + "." + cport.port.name
                    logger.info(f"AUTO PARAMETER: {cport} as {parameter_name}")
                    self.add_input(parameter_name, cport.fmu.name, cport.port.name)
                    auto_wired.add_parameter(parameter_name, cport.fmu.name, cport.port.name)
                elif cport.port.causality == 'input':
                    logger.info(f"AUTO INPUT: Expose {cport}")
                    self.add_input(cport.port.name, cport.fmu.name, cport.port.name)
                    auto_wired.add_input(cport.port.name, cport.fmu.name, cport.port.name)

    logger.info(f"Auto-wiring: {auto_wired}")

    return auto_wired

add_input(container_port_name, to_fmu_filename, to_port_name)

Expose a port of an embedded FMU as a container input.

Multiple embedded FMU ports can be connected to the same container input (fan-out), provided they share the same type and causality.

Parameters:

Name Type Description Default
container_port_name str

Exposed name on the container. If empty, defaults to to_port_name.

required
to_fmu_filename str

Filename of the embedded FMU.

required
to_port_name str

Name of the input port on the embedded FMU.

required

Raises:

Type Description
FMUContainerError

If the port causality is not "input" or "parameter", or if types do not match an existing input with the same name.

Source code in fmu_manipulation_toolbox/container.py
def add_input(self, container_port_name: str, to_fmu_filename: str, to_port_name: str):
    """Expose a port of an embedded FMU as a container input.

    Multiple embedded FMU ports can be connected to the same container
    input (fan-out), provided they share the same type and causality.

    Args:
        container_port_name (str): Exposed name on the container. If empty,
            defaults to `to_port_name`.
        to_fmu_filename (str): Filename of the embedded FMU.
        to_port_name (str): Name of the input port on the embedded FMU.

    Raises:
        FMUContainerError: If the port causality is not `"input"` or
            `"parameter"`, or if types do not match an existing input
            with the same name.
    """
    if not container_port_name:
        container_port_name = to_port_name
    cport_to = ContainerPort(self.get_fmu(to_fmu_filename), to_port_name)
    if cport_to.port.causality not in ("input", "parameter"):  # check causality
        raise FMUContainerError(f"Tried to use '{cport_to}' as INPUT of the container but FMU causality is "
                                f"'{cport_to.port.causality}'.")

    try:
        input_port = self.inputs[container_port_name]
        input_port.add_cport(cport_to)
    except KeyError:
        self.inputs[container_port_name] = ContainerInput(container_port_name, cport_to)

    logger.debug(f"INPUT: {to_fmu_filename}:{to_port_name}")
    self.mark_ruled(cport_to, 'INPUT')

Connect an output of one embedded FMU to an input of another.

If both port names match FMI Terminal definitions, a terminal-level connection is made (connecting all member ports). Otherwise, a regular port-to-port link is created.

Parameters:

Name Type Description Default
from_fmu_filename str

Filename of the source FMU.

required
from_port_name str

Output port name (or terminal name).

required
to_fmu_filename str

Filename of the destination FMU.

required
to_port_name str

Input port name (or terminal name).

required

Raises:

Type Description
FMUContainerError

If port causalities are invalid or types are incompatible.

Source code in fmu_manipulation_toolbox/container.py
def add_link(self, from_fmu_filename: str, from_port_name: str, to_fmu_filename: str, to_port_name: str):
    """Connect an output of one embedded FMU to an input of another.

    If both port names match FMI Terminal definitions, a terminal-level
    connection is made (connecting all member ports). Otherwise, a regular
    port-to-port link is created.

    Args:
        from_fmu_filename (str): Filename of the source FMU.
        from_port_name (str): Output port name (or terminal name).
        to_fmu_filename (str): Filename of the destination FMU.
        to_port_name (str): Input port name (or terminal name).

    Raises:
        FMUContainerError: If port causalities are invalid or types
            are incompatible.
    """
    fmu_from = self.get_fmu(from_fmu_filename)
    fmu_to = self.get_fmu(to_fmu_filename)

    if from_port_name in fmu_from.terminals and to_port_name in fmu_to.terminals:
        # TERMINAL Connection
        terminal1 = fmu_from.terminals[from_port_name]
        terminal2 = fmu_to.terminals[to_port_name]
        if terminal1 == terminal2:
            logger.debug(f"Plugging terminals: {terminal1} <-> {terminal2}")
            for terminal1_port_name, terminal2_port_name in terminal1.connect(terminal2):
                self.add_link_regular(fmu_from, terminal1_port_name, fmu_to, terminal2_port_name)
        else:
            logger.error(f"Cannot plug incompatible terminals: {terminal1} <-> {terminal2}")
    else:
        # REGULAR port connection
        self.add_link_regular(fmu_from, from_port_name, fmu_to, to_port_name)

add_output(from_fmu_filename, from_port_name, container_port_name)

Expose a port of an embedded FMU as a container output.

Parameters:

Name Type Description Default
from_fmu_filename str

Filename of the embedded FMU.

required
from_port_name str

Name of the output port on the embedded FMU.

required
container_port_name str

Exposed name on the container. If empty, defaults to from_port_name.

required

Raises:

Type Description
FMUContainerError

If the port causality is not "output" or "local", or if the exposed name is already used.

Source code in fmu_manipulation_toolbox/container.py
def add_output(self, from_fmu_filename: str, from_port_name: str, container_port_name: str):
    """Expose a port of an embedded FMU as a container output.

    Args:
        from_fmu_filename (str): Filename of the embedded FMU.
        from_port_name (str): Name of the output port on the embedded FMU.
        container_port_name (str): Exposed name on the container. If empty,
            defaults to `from_port_name`.

    Raises:
        FMUContainerError: If the port causality is not `"output"` or
            `"local"`, or if the exposed name is already used.
    """
    if not container_port_name:  # empty is allowed
        container_port_name = from_port_name

    cport_from = ContainerPort(self.get_fmu(from_fmu_filename), from_port_name)
    if cport_from.port.causality not in ("output", "local"):  # check causality
        raise FMUContainerError(f"Tried to use '{cport_from}' as OUTPUT of the container but FMU causality is "
                                f"'{cport_from.port.causality}'.")

    if container_port_name in self.outputs:
        raise FMUContainerError(f"Duplicate OUTPUT {container_port_name} already connected to {cport_from}")

    logger.debug(f"OUTPUT: {from_fmu_filename}:{from_port_name}")
    self.mark_ruled(cport_from, 'OUTPUT')
    self.outputs[container_port_name] = cport_from

add_start_value(fmu_filename, port_name, value)

Set a start value for a port of an embedded FMU.

The value is automatically converted to the appropriate type (float, int, bool, or string).

Parameters:

Name Type Description Default
fmu_filename str

Filename of the embedded FMU.

required
port_name str

Name of the port.

required
value str

Start value as a string.

required

Raises:

Type Description
FMUContainerError

If the value cannot be converted to the port's type.

Source code in fmu_manipulation_toolbox/container.py
def add_start_value(self, fmu_filename: str, port_name: str, value: str):
    """Set a start value for a port of an embedded FMU.

    The value is automatically converted to the appropriate type
    (float, int, bool, or string).

    Args:
        fmu_filename (str): Filename of the embedded FMU.
        port_name (str): Name of the port.
        value (str): Start value as a string.

    Raises:
        FMUContainerError: If the value cannot be converted to the
            port's type.
    """
    cport = ContainerPort(self.get_fmu(fmu_filename), port_name)

    try:
        if cport.port.type_name.startswith('real'):
            value = float(value)
        elif cport.port.type_name.startswith('integer') or  cport.port.type_name.startswith('uinteger'):
            value = int(value)
        elif cport.port.type_name.startswith('boolean'):
            value = int(bool(value))
        elif cport.port.type_name == 'String':
            value = value
        else:
            logger.error(f"Start value cannot be set on '{cport.port.type_name}'")
            return
    except ValueError:
        raise FMUContainerError(f"Start value is not conforming to {cport.port.type_name} format.")

    self.start_values[cport] = value

default_step_size()

Compute the default step size from embedded FMUs.

Uses the GCD of the frequencies of FMUs that cannot handle variable step sizes. If all FMUs support variable steps, returns the largest step size.

Returns:

Name Type Description
float float

Computed step size in seconds.

Source code in fmu_manipulation_toolbox/container.py
def default_step_size(self) -> float:
    """Compute the default step size from embedded FMUs.

    Uses the GCD of the frequencies of FMUs that cannot handle variable
    step sizes. If all FMUs support variable steps, returns the largest
    step size.

    Returns:
        float: Computed step size in seconds.
    """
    freq_set = set()
    for fmu in self.involved_fmu.values():
        if fmu.step_size and fmu.capabilities["canHandleVariableCommunicationStepSize"] == "false":
            freq_set.add(int(1.0/fmu.step_size))

    if not freq_set:
        # all involved FMUs can Handle Variable Communication StepSize
        step_size_max = 0
        for fmu in self.involved_fmu.values():
            if fmu.step_size > step_size_max:
                step_size_max = fmu.step_size
        return step_size_max

    common_freq = math.gcd(*freq_set)
    try:
        step_size = 1.0 / float(common_freq)
    except ZeroDivisionError:
        step_size = 0.1
        logger.warning(f"Defaulting to step_size={step_size}")

    return step_size

drop_port(from_fmu_filename, from_port_name)

Explicitly ignore an output port of an embedded FMU.

Prevents the port from being auto-exposed or flagged as unconnected.

Parameters:

Name Type Description Default
from_fmu_filename str

Filename of the embedded FMU.

required
from_port_name str

Name of the output port to drop.

required

Raises:

Type Description
FMUContainerError

If the port causality is not "output".

Source code in fmu_manipulation_toolbox/container.py
def drop_port(self, from_fmu_filename: str, from_port_name: str):
    """Explicitly ignore an output port of an embedded FMU.

    Prevents the port from being auto-exposed or flagged as unconnected.

    Args:
        from_fmu_filename (str): Filename of the embedded FMU.
        from_port_name (str): Name of the output port to drop.

    Raises:
        FMUContainerError: If the port causality is not `"output"`.
    """
    cport_from = ContainerPort(self.get_fmu(from_fmu_filename), from_port_name)
    if not cport_from.port.causality == "output":  # check causality
        raise FMUContainerError(f"{cport_from}: trying to DROP {cport_from.port.causality}")

    logger.debug(f"DROP: {from_fmu_filename}:{from_port_name}")
    self.mark_ruled(cport_from, 'DROP')

get_fmu(fmu_filename)

Load an embedded FMU from the FMU directory.

If the FMU has already been loaded, returns the cached instance.

Parameters:

Name Type Description Default
fmu_filename str

Filename of the FMU (e.g. "model.fmu").

required

Returns:

Name Type Description
EmbeddedFMU EmbeddedFMU

The loaded and analysed FMU.

Raises:

Type Description
FMUContainerError

If the FMU cannot be loaded.

Source code in fmu_manipulation_toolbox/container.py
def get_fmu(self, fmu_filename: str) -> EmbeddedFMU:
    """Load an embedded FMU from the FMU directory.

    If the FMU has already been loaded, returns the cached instance.

    Args:
        fmu_filename (str): Filename of the FMU (e.g. `"model.fmu"`).

    Returns:
        EmbeddedFMU: The loaded and analysed FMU.

    Raises:
        FMUContainerError: If the FMU cannot be loaded.
    """
    if fmu_filename in self.involved_fmu:
        return self.involved_fmu[fmu_filename]

    try:
        fmu = EmbeddedFMU(self.fmu_directory / fmu_filename)
        if not fmu.fmi_version == self.fmi_version:
            logger.warning(f"Try to embed FMU-{fmu.fmi_version} into container FMI-{self.fmi_version}.")
        self.involved_fmu[fmu.name] = fmu

        logger.info(f"Involved FMU #{len(self.involved_fmu)}: {fmu}")
    except (FMUContainerError, FMUError) as e:
        raise FMUContainerError(f"Cannot load '{fmu_filename}': {e}")

    return fmu

make_fmu(fmu_filename, step_size=None, debug=False, mt=False, profiling=False, sequential=False, ts_multiplier=False, datalog=False)

Build the FMU Container archive.

Generates the modelDescription.xml, the container.txt runtime configuration, and packages everything into a .fmu zip archive.

Parameters:

Name Type Description Default
fmu_filename str | Path

Output filename for the container.

required
step_size float | None

Internal time step in seconds. If None, deduced from the embedded FMUs.

None
debug bool

Keep intermediate build artifacts.

False
mt bool

Enable multi-threaded mode.

False
profiling bool

Enable profiling mode.

False
sequential bool

Use sequential scheduling.

False
ts_multiplier bool

Add a TS_MULTIPLIER input port.

False
datalog bool

Generate a datalog configuration.

False
Source code in fmu_manipulation_toolbox/container.py
def make_fmu(self, fmu_filename: Union[str, Path], step_size: Optional[float] = None, debug=False, mt=False,
             profiling=False, sequential=False, ts_multiplier=False, datalog=False):
    """Build the FMU Container archive.

    Generates the `modelDescription.xml`, the `container.txt` runtime
    configuration, and packages everything into a `.fmu` zip archive.

    Args:
        fmu_filename (str | Path): Output filename for the container.
        step_size (float | None): Internal time step in seconds. If `None`,
            deduced from the embedded FMUs.
        debug (bool): Keep intermediate build artifacts.
        mt (bool): Enable multi-threaded mode.
        profiling (bool): Enable profiling mode.
        sequential (bool): Use sequential scheduling.
        ts_multiplier (bool): Add a `TS_MULTIPLIER` input port.
        datalog (bool): Generate a datalog configuration.
    """
    if isinstance(fmu_filename, str):
        fmu_filename = Path(fmu_filename)

    if step_size is None:
        logger.info(f"step_size  will be deduced from the embedded FMU's")
        step_size = self.default_step_size()
    self.sanity_check(step_size)

    logger.info(f"Building FMU '{fmu_filename}', step_size={step_size}")

    base_directory = self.fmu_directory / fmu_filename.with_suffix('')
    resources_directory = self.make_fmu_skeleton(base_directory)

    with open(base_directory / "modelDescription.xml", "wt") as xml_file:
        self.make_fmu_xml(xml_file, step_size, profiling, ts_multiplier)
    with open(resources_directory / "container.txt", "wt") as txt_file:
        self.make_fmu_txt(txt_file, step_size, mt, profiling, sequential)

    if datalog:
        with open(resources_directory / "datalog.txt", "wt") as datalog_file:
            self.make_datalog(datalog_file)

    self.make_fmu_package(base_directory, fmu_filename)
    if not debug:
        self.make_fmu_cleanup(base_directory)

sanity_check(step_size)

Validate the container configuration before building.

Warns about step size mismatches and unconnected ports.

Parameters:

Name Type Description Default
step_size float | None

The container's internal step size.

required
Source code in fmu_manipulation_toolbox/container.py
def sanity_check(self, step_size: Optional[float]):
    """Validate the container configuration before building.

    Warns about step size mismatches and unconnected ports.

    Args:
        step_size (float | None): The container's internal step size.
    """
    for fmu in self.involved_fmu.values():
        if fmu.step_size and fmu.capabilities["canHandleVariableCommunicationStepSize"] == "false":
            ts_ratio = step_size / fmu.step_size
            logger.debug(f"container step_size: {step_size} = {fmu.step_size} x {ts_ratio} for {fmu.name}")
            if ts_ratio < 1.0:
                logger.warning(f"Container step_size={step_size}s is lower than FMU '{fmu.name}' "
                               f"step_size={fmu.step_size}s.")
            if ts_ratio != int(ts_ratio):
                logger.warning(f"Container step_size={step_size}s should divisible by FMU '{fmu.name}' "
                               f"step_size={fmu.step_size}s.")
        for port_name in fmu.ports:
            cport = ContainerPort(fmu, port_name)
            if cport not in self.rules:
                if cport.port.causality == 'input':
                    logger.error(f"{cport} is not connected")
                if cport.port.causality == 'output':
                    logger.warning(f"{cport} is not connected")

FMUContainerError

Bases: Exception

Exception raised for errors during FMU Container operations.

Attributes:

Name Type Description
reason str

Human-readable description of the error.

Source code in fmu_manipulation_toolbox/container.py
class FMUContainerError(Exception):
    """Exception raised for errors during FMU Container operations.

    Attributes:
        reason (str): Human-readable description of the error.
    """

    def __init__(self, reason: str):
        self.reason = reason

    def __repr__(self):
        return f"{self.reason}"

FMUIOList

Tracks the I/O mapping between the container and its embedded FMUs.

Organizes inputs, outputs, and start values by type and FMU, supporting both regular and clocked variables. Used to generate the container.txt runtime configuration file.

Attributes:

Name Type Description
vr_table ValueReferenceTable

Reference table for VR lookups.

inputs

Nested mapping [type][fmu_name][clock_vr] → list of (fmu_vr, local_vr) tuples.

outputs

Nested mapping [type][fmu_name][clock_vr] → list of (fmu_vr, local_vr) tuples.

start_values

Mapping [type][fmu_name] → list of (fmu_vr, reset, value) tuples.

Source code in fmu_manipulation_toolbox/container.py
class FMUIOList:
    """Tracks the I/O mapping between the container and its embedded FMUs.

    Organizes inputs, outputs, and start values by type and FMU, supporting
    both regular and clocked variables. Used to generate the `container.txt`
    runtime configuration file.

    Attributes:
        vr_table (ValueReferenceTable): Reference table for VR lookups.
        inputs: Nested mapping `[type][fmu_name][clock_vr]` → list of
            `(fmu_vr, local_vr)` tuples.
        outputs: Nested mapping `[type][fmu_name][clock_vr]` → list of
            `(fmu_vr, local_vr)` tuples.
        start_values: Mapping `[type][fmu_name]` → list of
            `(fmu_vr, reset, value)` tuples.
    """

    def __init__(self, vr_table: ValueReferenceTable):
        self.vr_table = vr_table
        self.inputs = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))  # [type][fmu][clock_vr][(fmu_vr, vr)]
        self.nb_clocked_inputs = defaultdict(lambda: defaultdict(lambda: 0))
        self.outputs = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))  # [type][fmu][clock_vr][(fmu_vr, vr)]
        self.nb_clocked_outputs = defaultdict(lambda: defaultdict(lambda: 0))
        self.start_values = defaultdict(lambda: defaultdict(list)) # [type][fmu][(cport, value)]

    def add_input(self, cport: ContainerPort, local_vr: int):
        """Register an input mapping for an embedded FMU port.

        Args:
            cport (ContainerPort): The embedded FMU input port.
            local_vr (int): The local value reference in the container.
        """
        if cport.port.clock is None:
            clock = None
        else:
            try:
                clock = self.vr_table.get_local_clock(cport)
            except KeyError:
                logger.error(f"Cannot expose clocked input: {cport}")
                return
            self.nb_clocked_inputs[cport.port.type_name][cport.fmu.name] += 1
        self.inputs[cport.port.type_name][cport.fmu.name][clock].append((cport.port.vr, local_vr))

    def add_output(self, cport: ContainerPort, local_vr: int):
        """Register an output mapping for an embedded FMU port.

        Args:
            cport (ContainerPort): The embedded FMU output port.
            local_vr (int): The local value reference in the container.
        """
        if cport.port.clock is None:
            clock = None
        else:
            try:
                clock = self.vr_table.get_local_clock(cport)
            except KeyError:
                logger.error(f"Cannot expose clocked output: {cport}")
                return
            self.nb_clocked_outputs[cport.port.type_name][cport.fmu.name] += 1
        self.outputs[cport.port.type_name][cport.fmu.name][clock].append((cport.port.vr, local_vr))

    def add_start_value(self, cport: ContainerPort, value: str):
        """Register a start value for an embedded FMU port.

        Args:
            cport (ContainerPort): The embedded FMU port.
            value (str): The start value.
        """
        reset = 1 if cport.port.causality == "input" else 0
        self.start_values[cport.port.type_name][cport.fmu.name].append((cport.port.vr, reset, value))

    def write_txt(self, fmu_name, txt_file):
        """Write the I/O mapping for one FMU to the `container.txt` file.

        Args:
            fmu_name (str): Name of the embedded FMU.
            txt_file: Writable text file handle.
        """
        for type_name in EmbeddedFMUPort.ALL_TYPES:
            print(f"# Inputs of {fmu_name} - {type_name}: <VR> <FMU_VR>", file=txt_file)
            print(len(self.inputs[type_name][fmu_name][None]), file=txt_file)
            for fmu_vr, vr in self.inputs[type_name][fmu_name][None]:
                print(f"{vr} {fmu_vr}", file=txt_file)
            if not type_name == "clock":
                print(f"# Clocked Inputs of {fmu_name} - {type_name}: <FMU_VR_CLOCK> <n> <VR> <FMU_VR>", file=txt_file)
                print(f"{len(self.inputs[type_name][fmu_name])-1} {self.nb_clocked_inputs[type_name][fmu_name]}",
                      file=txt_file)
                for clock, table in self.inputs[type_name][fmu_name].items():
                    if not clock is None:
                        s = " ".join([f"{vr} {fmu_vr}" for fmu_vr, vr in table])
                        print(f"{clock} {len(table)} {s}", file=txt_file)

        for type_name in EmbeddedFMUPort.ALL_TYPES[:-2]:  # No start values for binary or clock
            print(f"# Start values of {fmu_name} - {type_name}: <FMU_VR> <RESET> <VALUE>", file=txt_file)
            print(len(self.start_values[type_name][fmu_name]), file=txt_file)
            for vr, reset, value in self.start_values[type_name][fmu_name]:
                print(f"{vr} {reset} {value}", file=txt_file)

        for type_name in EmbeddedFMUPort.ALL_TYPES:
            print(f"# Outputs of {fmu_name} - {type_name}: <VR> <FMU_VR>", file=txt_file)
            print(len(self.outputs[type_name][fmu_name][None]), file=txt_file)
            for fmu_vr, vr in self.outputs[type_name][fmu_name][None]:
                print(f"{vr} {fmu_vr}", file=txt_file)

            if not type_name == "clock":
                print(f"# Clocked Outputs of {fmu_name} - {type_name}: <FMU_VR_CLOCK> <n> <VR> <FMU_VR>", file=txt_file)
                print(f"{len(self.outputs[type_name][fmu_name])-1} {self.nb_clocked_outputs[type_name][fmu_name]}",
                      file=txt_file)
                for clock, translation in self.outputs[type_name][fmu_name].items():
                    if not clock is None:
                        s = " ".join([f"{vr} {fmu_vr}" for fmu_vr, vr in translation])
                        print(f"{clock} {len(translation)} {s}", file=txt_file)

add_input(cport, local_vr)

Register an input mapping for an embedded FMU port.

Parameters:

Name Type Description Default
cport ContainerPort

The embedded FMU input port.

required
local_vr int

The local value reference in the container.

required
Source code in fmu_manipulation_toolbox/container.py
def add_input(self, cport: ContainerPort, local_vr: int):
    """Register an input mapping for an embedded FMU port.

    Args:
        cport (ContainerPort): The embedded FMU input port.
        local_vr (int): The local value reference in the container.
    """
    if cport.port.clock is None:
        clock = None
    else:
        try:
            clock = self.vr_table.get_local_clock(cport)
        except KeyError:
            logger.error(f"Cannot expose clocked input: {cport}")
            return
        self.nb_clocked_inputs[cport.port.type_name][cport.fmu.name] += 1
    self.inputs[cport.port.type_name][cport.fmu.name][clock].append((cport.port.vr, local_vr))

add_output(cport, local_vr)

Register an output mapping for an embedded FMU port.

Parameters:

Name Type Description Default
cport ContainerPort

The embedded FMU output port.

required
local_vr int

The local value reference in the container.

required
Source code in fmu_manipulation_toolbox/container.py
def add_output(self, cport: ContainerPort, local_vr: int):
    """Register an output mapping for an embedded FMU port.

    Args:
        cport (ContainerPort): The embedded FMU output port.
        local_vr (int): The local value reference in the container.
    """
    if cport.port.clock is None:
        clock = None
    else:
        try:
            clock = self.vr_table.get_local_clock(cport)
        except KeyError:
            logger.error(f"Cannot expose clocked output: {cport}")
            return
        self.nb_clocked_outputs[cport.port.type_name][cport.fmu.name] += 1
    self.outputs[cport.port.type_name][cport.fmu.name][clock].append((cport.port.vr, local_vr))

add_start_value(cport, value)

Register a start value for an embedded FMU port.

Parameters:

Name Type Description Default
cport ContainerPort

The embedded FMU port.

required
value str

The start value.

required
Source code in fmu_manipulation_toolbox/container.py
def add_start_value(self, cport: ContainerPort, value: str):
    """Register a start value for an embedded FMU port.

    Args:
        cport (ContainerPort): The embedded FMU port.
        value (str): The start value.
    """
    reset = 1 if cport.port.causality == "input" else 0
    self.start_values[cport.port.type_name][cport.fmu.name].append((cport.port.vr, reset, value))

write_txt(fmu_name, txt_file)

Write the I/O mapping for one FMU to the container.txt file.

Parameters:

Name Type Description Default
fmu_name str

Name of the embedded FMU.

required
txt_file

Writable text file handle.

required
Source code in fmu_manipulation_toolbox/container.py
def write_txt(self, fmu_name, txt_file):
    """Write the I/O mapping for one FMU to the `container.txt` file.

    Args:
        fmu_name (str): Name of the embedded FMU.
        txt_file: Writable text file handle.
    """
    for type_name in EmbeddedFMUPort.ALL_TYPES:
        print(f"# Inputs of {fmu_name} - {type_name}: <VR> <FMU_VR>", file=txt_file)
        print(len(self.inputs[type_name][fmu_name][None]), file=txt_file)
        for fmu_vr, vr in self.inputs[type_name][fmu_name][None]:
            print(f"{vr} {fmu_vr}", file=txt_file)
        if not type_name == "clock":
            print(f"# Clocked Inputs of {fmu_name} - {type_name}: <FMU_VR_CLOCK> <n> <VR> <FMU_VR>", file=txt_file)
            print(f"{len(self.inputs[type_name][fmu_name])-1} {self.nb_clocked_inputs[type_name][fmu_name]}",
                  file=txt_file)
            for clock, table in self.inputs[type_name][fmu_name].items():
                if not clock is None:
                    s = " ".join([f"{vr} {fmu_vr}" for fmu_vr, vr in table])
                    print(f"{clock} {len(table)} {s}", file=txt_file)

    for type_name in EmbeddedFMUPort.ALL_TYPES[:-2]:  # No start values for binary or clock
        print(f"# Start values of {fmu_name} - {type_name}: <FMU_VR> <RESET> <VALUE>", file=txt_file)
        print(len(self.start_values[type_name][fmu_name]), file=txt_file)
        for vr, reset, value in self.start_values[type_name][fmu_name]:
            print(f"{vr} {reset} {value}", file=txt_file)

    for type_name in EmbeddedFMUPort.ALL_TYPES:
        print(f"# Outputs of {fmu_name} - {type_name}: <VR> <FMU_VR>", file=txt_file)
        print(len(self.outputs[type_name][fmu_name][None]), file=txt_file)
        for fmu_vr, vr in self.outputs[type_name][fmu_name][None]:
            print(f"{vr} {fmu_vr}", file=txt_file)

        if not type_name == "clock":
            print(f"# Clocked Outputs of {fmu_name} - {type_name}: <FMU_VR_CLOCK> <n> <VR> <FMU_VR>", file=txt_file)
            print(f"{len(self.outputs[type_name][fmu_name])-1} {self.nb_clocked_outputs[type_name][fmu_name]}",
                  file=txt_file)
            for clock, translation in self.outputs[type_name][fmu_name].items():
                if not clock is None:
                    s = " ".join([f"{vr} {fmu_vr}" for fmu_vr, vr in translation])
                    print(f"{clock} {len(translation)} {s}", file=txt_file)

Represents an internal connection between embedded FMUs inside a container.

A link routes one output port to one or more input ports. When the source and target types differ, automatic type conversion is applied if a conversion function exists.

Attributes:

Name Type Description
CONVERSION_FUNCTION dict[str, str]

Mapping from type pair strings (e.g. "real32/real64") to conversion function identifiers.

name str

Human-readable name derived from the source FMU and port.

cport_from ContainerPort | None

Source output port, or None for importer-generated clocks.

cport_to_list list[ContainerPort]

Destination input ports.

vr int | None

Value reference for the local variable holding the link value.

vr_converted dict[str, int | None]

Value references for type-converted copies, keyed by target type name.

Source code in fmu_manipulation_toolbox/container.py
class Link:
    """Represents an internal connection between embedded FMUs inside a container.

    A link routes one output port to one or more input ports. When the source
    and target types differ, automatic type conversion is applied if a
    conversion function exists.

    Attributes:
        CONVERSION_FUNCTION (dict[str, str]): Mapping from type pair strings
            (e.g. `"real32/real64"`) to conversion function identifiers.
        name (str): Human-readable name derived from the source FMU and port.
        cport_from (ContainerPort | None): Source output port, or `None` for
            importer-generated clocks.
        cport_to_list (list[ContainerPort]): Destination input ports.
        vr (int | None): Value reference for the local variable holding the link value.
        vr_converted (dict[str, int | None]): Value references for type-converted
            copies, keyed by target type name.
    """

    CONVERSION_FUNCTION = {
        "real32/real64": "F32_F64",

        "Int8/Int16": "D8_D16",
        "Int8/UInt16": "D8_U16",
        "Int8/Int32": "D8_D32",
        "Int8/UInt32": "D8_U32",
        "Int8/Int64": "D8_D64",
        "Int8/UInt64": "D8_U64",

        "UInt8/Int16": "U8_D16",
        "UInt8/UInt16": "U8_U16",
        "UInt8/Int32": "U8_D32",
        "UInt8/UInt32": "U8_U32",
        "UInt8/Int64": "U8_D64",
        "UInt8/UInt64": "U8_U64",

        "Int16/Int32": "D16_D32",
        "Int16/UInt32": "D16_U32",
        "Int16/Int64": "D16_D64",
        "Int16/UInt64": "D16_U64",

        "UInt16/Int32": "U16_D32",
        "UInt16/UInt32": "U16_U32",
        "UInt16/Int64": "U16_D64",
        "UInt16/UInt64": "U16_U64",

        "Int32/Int64": "D32_D64",
        "Int32/UInt64": "D32_U64",

        "UInt32/Int64": "U32_D64",
        "UInt32/UInt64": "U32_U64",

        "boolean/boolean1": "B_B1",
        "boolean1/boolean": "B1_B",
    }

    def __init__(self, cport_from: ContainerPort):
        self.name = cport_from.fmu.id + "." + cport_from.port.name  # strip .fmu suffix
        self.cport_from = cport_from
        self.cport_to_list: List[ContainerPort] = []

        self.vr: Optional[int] = None
        self.vr_converted: Dict[str, Optional[int]] = {}

        if not cport_from.port.causality == "output":
            if cport_from.port.type_name == "clock":
                # LS-BUS allows to connected to input clocks.
                self.cport_from = None
                self.add_target(cport_from)
            else:
                raise FMUContainerError(f"{cport_from} is {cport_from.port.causality} instead of OUTPUT")

    def add_target(self, cport_to: ContainerPort):
        """Add a destination input port to this link.

        Args:
            cport_to (ContainerPort): The input port to connect.

        Raises:
            FMUContainerError: If the port is not an input, or if types are
                incompatible and no conversion exists.
        """
        if not cport_to.port.causality == "input":
            raise FMUContainerError(f"{cport_to} is {cport_to.port.causality} instead of INPUT")

        if (cport_to.port.type_name == "clock" and self.cport_from is None or
                cport_to.port.type_name == self.cport_from.port.type_name):
            self.cport_to_list.append(cport_to)
        elif self.get_conversion(cport_to):
            self.cport_to_list.append(cport_to)
            self.vr_converted[cport_to.port.type_name] = None
        else:
            raise FMUContainerError(f"failed to connect {self.cport_from} to {cport_to} due to type.")

    def get_conversion(self, cport_to: ContainerPort) -> Optional[str]:
        """Look up the conversion function for connecting to a different type.

        Args:
            cport_to (ContainerPort): The target port with a potentially
                different type.

        Returns:
            str | None: Conversion function identifier, or `None` if no
                conversion is available.
        """
        try:
            conversion = f"{self.cport_from.port.type_name}/{cport_to.port.type_name}"
            return self.CONVERSION_FUNCTION[conversion]
        except KeyError:
            return None

    def nb_local(self) -> int:
        """Return the number of local variables needed for this link.

        Returns:
            int: `1` for the main value plus one per type-converted copy.
        """
        return 1+len(self.vr_converted)

add_target(cport_to)

Add a destination input port to this link.

Parameters:

Name Type Description Default
cport_to ContainerPort

The input port to connect.

required

Raises:

Type Description
FMUContainerError

If the port is not an input, or if types are incompatible and no conversion exists.

Source code in fmu_manipulation_toolbox/container.py
def add_target(self, cport_to: ContainerPort):
    """Add a destination input port to this link.

    Args:
        cport_to (ContainerPort): The input port to connect.

    Raises:
        FMUContainerError: If the port is not an input, or if types are
            incompatible and no conversion exists.
    """
    if not cport_to.port.causality == "input":
        raise FMUContainerError(f"{cport_to} is {cport_to.port.causality} instead of INPUT")

    if (cport_to.port.type_name == "clock" and self.cport_from is None or
            cport_to.port.type_name == self.cport_from.port.type_name):
        self.cport_to_list.append(cport_to)
    elif self.get_conversion(cport_to):
        self.cport_to_list.append(cport_to)
        self.vr_converted[cport_to.port.type_name] = None
    else:
        raise FMUContainerError(f"failed to connect {self.cport_from} to {cport_to} due to type.")

get_conversion(cport_to)

Look up the conversion function for connecting to a different type.

Parameters:

Name Type Description Default
cport_to ContainerPort

The target port with a potentially different type.

required

Returns:

Type Description
Optional[str]

str | None: Conversion function identifier, or None if no conversion is available.

Source code in fmu_manipulation_toolbox/container.py
def get_conversion(self, cport_to: ContainerPort) -> Optional[str]:
    """Look up the conversion function for connecting to a different type.

    Args:
        cport_to (ContainerPort): The target port with a potentially
            different type.

    Returns:
        str | None: Conversion function identifier, or `None` if no
            conversion is available.
    """
    try:
        conversion = f"{self.cport_from.port.type_name}/{cport_to.port.type_name}"
        return self.CONVERSION_FUNCTION[conversion]
    except KeyError:
        return None

nb_local()

Return the number of local variables needed for this link.

Returns:

Name Type Description
int int

1 for the main value plus one per type-converted copy.

Source code in fmu_manipulation_toolbox/container.py
def nb_local(self) -> int:
    """Return the number of local variables needed for this link.

    Returns:
        int: `1` for the main value plus one per type-converted copy.
    """
    return 1+len(self.vr_converted)

ValueReferenceTable

Allocates and tracks value references for the container's local variables.

Value references are encoded with a type mask in the upper bits, allowing the container runtime to identify the type from the VR alone.

Attributes:

Name Type Description
vr_table dict[str, int]

Next available VR index per type.

masks dict[str, int]

Bit mask per type, shifted to the upper byte.

nb_local_variable dict[str, int]

Count of local variables per type.

local_clock dict

Mapping from (EmbeddedFMU, fmu_vr) to local clock VR.

Source code in fmu_manipulation_toolbox/container.py
class ValueReferenceTable:
    """Allocates and tracks value references for the container's local variables.

    Value references are encoded with a type mask in the upper bits,
    allowing the container runtime to identify the type from the VR alone.

    Attributes:
        vr_table (dict[str, int]): Next available VR index per type.
        masks (dict[str, int]): Bit mask per type, shifted to the upper byte.
        nb_local_variable (dict[str, int]): Count of local variables per type.
        local_clock (dict): Mapping from `(EmbeddedFMU, fmu_vr)` to local
            clock VR.
    """

    def __init__(self):
        self.vr_table:Dict[str, int] = {}
        self.masks: Dict[str, int] = {}
        self.nb_local_variable:Dict[str, int] = {}
        self.local_clock = {}
        for i, type_name in enumerate(EmbeddedFMUPort.ALL_TYPES):
            self.vr_table[type_name] = 0
            self.masks[type_name] = i << 24
            self.nb_local_variable[type_name] = 0

    def add_vr(self, port_or_type_name: Union[ContainerPort, str], local: bool = False) -> int:
        """Allocate a new value reference.

        Args:
            port_or_type_name (ContainerPort | str): A port (type is inferred)
                or a type name string.
            local (bool): Whether this VR is for a local variable.

        Returns:
            int: The allocated value reference with type mask applied.
        """
        if isinstance(port_or_type_name, ContainerPort):
            type_name = port_or_type_name.port.type_name
        else:
            type_name = port_or_type_name

        if local:
            self.nb_local_variable[type_name] += 1

        vr = self.vr_table[type_name]
        self.vr_table[type_name] += 1

        return vr | self.masks[type_name]

    def set_link_vr(self, link: Link):
        """Allocate value references for a link and its type-converted copies.

        Args:
            link (Link): The link to assign VRs to.
        """
        if link.cport_from is None:
            link.vr = self.add_vr("clock", local=True)
        else:
            link.vr = self.add_vr(link.cport_from, local=True)
            if link.cport_from.port.type_name == "clock":
                self.local_clock[(link.cport_from.fmu, link.cport_from.port.vr)] = link.vr

        for cport_to in link.cport_to_list:
            if cport_to.port.type_name == "clock":
                self.local_clock[(cport_to.fmu, cport_to.port.vr)] = link.vr

        for type_name in link.vr_converted.keys():
            link.vr_converted[type_name] = self.add_vr(type_name, local=True)

    def get_local_clock(self, cport: ContainerPort) -> int:
        """Get the local VR for a clock associated with a clocked port.

        Args:
            cport (ContainerPort): The clocked port.

        Returns:
            int: The local value reference of the clock.
        """
        return self.local_clock[(cport.fmu, int(cport.port.clock))]


    def nb_local(self, type_name: str) -> int:
        """Return the number of local variables for a given type.

        Args:
            type_name (str): Container type name (e.g. `"real64"`).

        Returns:
            int: Number of local variables of this type.
        """
        return self.nb_local_variable[type_name]

add_vr(port_or_type_name, local=False)

Allocate a new value reference.

Parameters:

Name Type Description Default
port_or_type_name ContainerPort | str

A port (type is inferred) or a type name string.

required
local bool

Whether this VR is for a local variable.

False

Returns:

Name Type Description
int int

The allocated value reference with type mask applied.

Source code in fmu_manipulation_toolbox/container.py
def add_vr(self, port_or_type_name: Union[ContainerPort, str], local: bool = False) -> int:
    """Allocate a new value reference.

    Args:
        port_or_type_name (ContainerPort | str): A port (type is inferred)
            or a type name string.
        local (bool): Whether this VR is for a local variable.

    Returns:
        int: The allocated value reference with type mask applied.
    """
    if isinstance(port_or_type_name, ContainerPort):
        type_name = port_or_type_name.port.type_name
    else:
        type_name = port_or_type_name

    if local:
        self.nb_local_variable[type_name] += 1

    vr = self.vr_table[type_name]
    self.vr_table[type_name] += 1

    return vr | self.masks[type_name]

get_local_clock(cport)

Get the local VR for a clock associated with a clocked port.

Parameters:

Name Type Description Default
cport ContainerPort

The clocked port.

required

Returns:

Name Type Description
int int

The local value reference of the clock.

Source code in fmu_manipulation_toolbox/container.py
def get_local_clock(self, cport: ContainerPort) -> int:
    """Get the local VR for a clock associated with a clocked port.

    Args:
        cport (ContainerPort): The clocked port.

    Returns:
        int: The local value reference of the clock.
    """
    return self.local_clock[(cport.fmu, int(cport.port.clock))]

nb_local(type_name)

Return the number of local variables for a given type.

Parameters:

Name Type Description Default
type_name str

Container type name (e.g. "real64").

required

Returns:

Name Type Description
int int

Number of local variables of this type.

Source code in fmu_manipulation_toolbox/container.py
def nb_local(self, type_name: str) -> int:
    """Return the number of local variables for a given type.

    Args:
        type_name (str): Container type name (e.g. `"real64"`).

    Returns:
        int: Number of local variables of this type.
    """
    return self.nb_local_variable[type_name]

Allocate value references for a link and its type-converted copies.

Parameters:

Name Type Description Default
link Link

The link to assign VRs to.

required
Source code in fmu_manipulation_toolbox/container.py
def set_link_vr(self, link: Link):
    """Allocate value references for a link and its type-converted copies.

    Args:
        link (Link): The link to assign VRs to.
    """
    if link.cport_from is None:
        link.vr = self.add_vr("clock", local=True)
    else:
        link.vr = self.add_vr(link.cport_from, local=True)
        if link.cport_from.port.type_name == "clock":
            self.local_clock[(link.cport_from.fmu, link.cport_from.port.vr)] = link.vr

    for cport_to in link.cport_to_list:
        if cport_to.port.type_name == "clock":
            self.local_clock[(cport_to.fmu, cport_to.port.vr)] = link.vr

    for type_name in link.vr_converted.keys():
        link.vr_converted[type_name] = self.add_vr(type_name, local=True)