Source code for modelx.core.model

# Copyright (c) 2017-2026 Fumito Hamamura <fumito.ham@gmail.com>

# This library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation version 3.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.  If not, see <http://www.gnu.org/licenses/>.

import builtins
import itertools
import pathlib
import zipfile
import gc
from types import ModuleType

import networkx as nx

from modelx.core.base import (
    Interface,
    Impl,
    Derivable,
    get_mixin_slots,
    null_impl
)
from modelx.core.reference import ReferenceImpl, ReferenceProxy
from modelx.core.cells import CellsImpl, UserCellsImpl
from modelx.core.node import OBJ, KEY, get_node, ObjectNode
from modelx.core.parent import (
    EditableParentImpl,
    EditableParent,
)
from modelx.core.space import UserSpaceImpl
from modelx.core.binding.namespace import BaseNamespace, NamespaceServer
from modelx.core.formula import NULL_FORMULA
from modelx.core.util import is_valid_name
from modelx.core.execution.trace import TraceManager
from modelx.core.chainmap import CustomChainMap
from modelx.core.views import RefView, MacroView
from modelx.core.macro import MacroImpl


class IOSpecOperation:

    __slots__ = ()

    def update_pandas(self, old_data, new_data=None):
        """Update a pandas object assigned to References

        Replace with ``new_data`` the value of such a Reference whose value is
        ``old_data``. Both ``new_data`` and ``old_data`` need to be
        `DataFrame`_ or `Series`_.
        If ``old_data`` is assigned to multiple References in a model,
        the values of all the References are replaced with ``new_data``,
        even the References
        are defined in different locations within the model.
        The identity of pandas objects is determined by the `id()`_ function.
        If ``new_data`` is not given, :class:`~modelx.core.cells.Cells`
        that are dependent on the References are cleared.

        If ``old_data`` has an associated
        :class:`~modelx.io.pandasio.PandasData`,
        this method associates the :class:`~modelx.io.pandasio.PandasData`
        to ``new_data``.

        This method is available for :class:`~modelx.core.model.Model`
        and :class:`~modelx.core.space.UserSpace`. The method
        performs identically regardless of the types of calling objects.

        .. _id():
           https://docs.python.org/3/library/functions.html#id

        .. _Series:
           https://pandas.pydata.org/docs/reference/api/pandas.Series.html

        .. _DataFrame:
           https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html

        Args:
            new_data: A pandas `Series`_ or `DataFrame`_ object
            old_data(optional): A pandas `Series`_ or `DataFrame`_ object

        .. versionadded:: 0.18.0

        See Also:
            * :meth:`new_pandas`: Create a reference to pandas data
            * :class:`~modelx.io.pandasio.PandasData`: IOSpec for pandas objects

        """
        return self._impl.model.refmgr.update_value(old_data, new_data)

    def update_module(self, old_module, new_module=None):
        """Update an user-defined module assigned to References

        Update an user-defined Python module created by :meth:`new_module`.
        The ``new_module`` parameter is a path to the source file
        of a new user-defined module.
        If ``new_module`` is not given, the old module is reloaded
        from the same source file of the old module
        and a new module module is created.

        The values of References referring to the old module object
        are replaced with the new module object.

        If ``old_module`` is assigned to multiple References in a model,
        the value of all the References are updated, even the References
        are defined in different locations within the model.

        This method associates to the new module the
        :class:`~modelx.io.moduleio.ModuleData` object previously
        associated to the old module.

        This method is available for :class:`~modelx.core.model.Model`
        and :class:`~modelx.core.space.UserSpace`. The method
        performs identically regardless of the types of calling objects.

        Args:
            old_module: A user-defined Python module object.
            new_module: The path to the source file as a :obj:`str` or
                a path-like object.

        .. versionadded:: 0.18.0

        See Also:
            * :meth:`new_module`: Create a reference to a module
            * :class:`~modelx.io.moduleio.ModuleData`: IOSpec for module objects

        """
        if not isinstance(old_module, ModuleType):
            raise ValueError("not a module object")
        return self._impl.model.refmgr.update_value(old_module, new_module)

    def get_spec(self, data):
        """Get *IOSpec* associated with ``data``

        Returns the *IOSpec* object associated with ``data``.
        ``data`` should be an object referenced in the model.
        An *IOSpec* object is an instance of a subclass of
        :class:`~modelx.io.baseio.BaseIOSpec`.
        If no *IOSpec* is associated with `data`, an error is raised.

        See Also:
            * :meth:`~modelx.core.model.Model.del_spec`: Delete IOSpec for data
            * :class:`~modelx.io.baseio.BaseIOSpec`: Base class for IOSpec objects
            * :attr:`~modelx.core.model.Model.iospecs`: List all IOSpecs
        """
        spec = self._impl.refmgr.get_spec(data)
        if spec is None:
            raise ValueError("spec not found")
        else:
            return spec

    def del_spec(self, data):
        """Delete *IOSpec* associate with ``data``

        Deletes the *IOSpec* object associated with ``data``.
        ``data`` should be an object referenced in the model.
        An *IOSpec* object is an instance of a subclass of
        :class:`~modelx.io.baseio.BaseIOSpec`.

        See Also:
            * :meth:`~modelx.core.model.Model.get_spec`: Get IOSpec for data
            * :class:`~modelx.io.baseio.BaseIOSpec`: Base class for IOSpec objects
            * :attr:`~modelx.core.model.Model.iospecs`: List all IOSpecs
        """
        self._impl.refmgr._manager.del_spec(self.get_spec(data))

    @property
    def iospecs(self):
        """List of :class:`~modelx.io.baseio.BaseIOSpec` objects

        Returns a list of all objects of BaseIOSpec subclasses
        defined in this Model.

        :class:`~modelx.io.excelio.ExcelRange` and
        :class:`~modelx.io.pandasio.PandasData`
        are subclasses of :class:`~modelx.io.baseio.BaseIOSpec`.

        :class:`~modelx.io.excelio.ExcelRange`
        objects are created either by
        :meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>`
        or
        :meth:`UserSpace.new_excel_range<modelx.core.space.UserSpace.new_excel_range>`
        method.
        :class:`~modelx.io.pandasio.PandasData` objects are
        created either by
        :meth:`Model.new_pandas<modelx.core.model.Model.new_pandas>`
        or
        :meth:`UserSpace.new_pandas<modelx.core.space.UserSpace.new_pandas>`
        method.

        See Also:
            * :meth:`~modelx.core.model.Model.get_spec`: Get IOSpec for data
            * :class:`~modelx.io.excelio.ExcelRange`: IOSpec for Excel ranges
            * :class:`~modelx.io.pandasio.PandasData`: IOSpec for pandas objects
            * :meth:`UserSpace.new_excel_range<modelx.core.space.UserSpace.new_excel_range>`: Create Excel range in space
            * :meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>`: Create Excel range in model
            * :meth:`UserSpace.new_pandas<modelx.core.space.UserSpace.new_pandas>`: Create pandas reference in space
            * :meth:`Model.new_pandas<modelx.core.model.Model.new_pandas>`: Create pandas reference in model

        .. versionchanged:: 0.20.0 renamed to ``iospecs`` from ``dataspecs``
        .. versionchanged:: 0.18.0 the property name is changed
            from ``dataclients`` to ``dataspecs``
        .. versionadded:: 0.9.0

        """
        return list(self._impl.refmgr.specs)


[docs] class Model(IOSpecOperation, EditableParent): """Top-level container representing a complete modelx model. Model is the root object in the modelx object hierarchy and serves as the primary container for organizing spaces, cells, and references. Each Model represents an independent, self-contained computational model with its own namespace and execution environment. A Model contains: * :class:`~modelx.core.space.UserSpace` objects (top-level spaces) * Global references accessible throughout the model * Serialization and I/O specifications Key Characteristics: * **Top-level container**: Root of the object hierarchy * **Independent namespace**: Each model has isolated global references * **Serializable**: Can be saved to/loaded from files or zip archives * **Exportable**: Can be exported as a Python package Creation: Models are created using the :func:`~modelx.new_model` function:: >>> import modelx as mx >>> model = mx.new_model() >>> model <Model Model1> >>> # Create with specific name >>> model = mx.new_model('MyModel') >>> model <Model MyModel> Adding Spaces: Create child spaces to organize cells and nested structures:: >>> space = model.new_space('Calculations') >>> space <UserSpace Calculations in MyModel> Global References: Set global references accessible from all spaces in the model:: >>> import numpy as np >>> model.np = np >>> model.discount_rate = 0.05 Persistence: Models can be saved and loaded in multiple formats:: >>> # Save as directory structure with text files >>> model.write('path/to/model') >>> # Save as zip archive >>> model.zip('path/to/model.zip') >>> # Load a saved model >>> loaded = mx.read_model('path/to/model') Memory Management: Clear calculated values to free memory:: >>> model.clear_all() # Clear all cells and dynamic spaces in the model Multiple Models: Multiple models can coexist in the same session:: >>> model1 = mx.new_model('Model1') >>> model2 = mx.new_model('Model2') >>> mx.get_models() {'Model1': <Model Model1>, 'Model2': <Model Model2>} Accessing Current Model: Get the currently active model:: >>> mx.cur_model() <Model Model2> See Also: * :func:`~modelx.new_model`: Create a new model * :func:`~modelx.read_model`: Load a model from files * :func:`~modelx.get_models`: Get all models in the session * :func:`~modelx.cur_model`: Get the current model * :class:`~modelx.core.space.UserSpace`: Container for cells and nested spaces * :meth:`~modelx.core.model.Model.write`: Save model to files * :meth:`~modelx.core.model.Model.zip`: Save model to zip archive .. versionchanged:: 0.18.0 Added pandas and module update operations """ __slots__ = ()
[docs] def rename(self, name, rename_old=False): """Rename the model itself""" self._impl.system.rename_model( new_name=name, old_name=self.name, rename_old=rename_old)
[docs] def clear_all(self): """Clears :class:`~modelx.core.cells.Cells` and :class:`~modelx.core.space.ItemSpace`. Clears both the input values and the calculated values of all the :class:`~modelx.core.cells.Cells` in the model and delete all the :class:`~modelx.core.space.ItemSpace` objects in the model. .. seealso:: :meth:`UserSpace.clear_all<modelx.core.space.UserSpace.clear_all>` .. versionadded:: 0.16.0 """ for space in self._impl.spaces.values(): space.clear_all_cells( clear_input=True, recursive=True, del_items=True )
[docs] def close(self): """Close the model.""" self._impl.system.close_model(self._impl)
[docs] def new_macro(self, name=None, formula=None): """Create a new :class:`~modelx.core.macro.Macro` in this model. Creates a macro that acts as a callable Python function saved within the model. Macros share a dedicated global namespace that includes the model itself as both ``mx_model`` and by the model's name. Args: name (str, optional): Name for the macro. If omitted and a function is provided, the function's name is used. If the function name is not valid for a macro name, an error is raised. Must be a valid Python identifier, and must not start with an underscore. formula (callable, optional): The function definition. Can be: * A Python function (def or lambda) * None to create an empty macro (not recommended) Returns: :class:`~modelx.core.macro.Macro`: The newly created macro object Example: Creating a macro using new_macro:: >>> model = mx.new_model('MyModel') >>> def get_model_info(): ... return f"Model: {mx_model._name}" >>> model.new_macro(formula=get_model_info) <Macro MyModel.get_model_info> >>> model.get_model_info() 'Model: MyModel' Above is equivalent to creating a macro using the decorator:: >>> @mx.defmacro ... def get_model_info(): ... return f"Model: {mx_model._name}" <Macro MyModel.get_model_info> Creating a macro with a custom name from a lambda function:: >>> model.new_macro('double', lambda x: x * 2) <Macro MyModel.double> >>> model.double(5) 10 Macros can call other macros in the same model:: >>> @mx.defmacro ... def helper(): ... return 42 >>> @mx.defmacro ... def main(): ... return helper() * 2 >>> model.main() 84 See Also: * :func:`~modelx.defmacro`: Decorator to create macros * :attr:`~modelx.core.model.Model.macros`: Access all macros * :meth:`~modelx.core.model.Model.export`: Export model with macros .. versionadded:: 0.30.0 """ if formula is None: raise ValueError("formula must be provided") if name is None: if hasattr(formula, '__name__'): name = formula.__name__ else: raise ValueError("name must be provided when formula has no __name__") return self._impl.new_macro(name, formula).interface
@Interface.doc.setter def doc(self, value): self._impl.doc = value @property def path(self): r"""A Path object representing the model's path. When a previously saved model is loaded with :func:`~modelx.read_model`, this property is set to a `pathlib.Path`_ object representing the path given to :func:`~modelx.read_model`:: >>> import modelx as mx >>> model = mx.read_model(r"C:\Users\mxuser\Model") >>> model.path WindowsPath('C:/Users/mxuser/Model2') When a model is created with :py:func:`~modelx.new_model`, this property is set to ``None``:: >>> model = mx.new_model() >>> model.path # Returns None The user can set the path by assigning a string value to it:: >>> model.path = "." >>> model.path WindowsPath('.') When a model is saved with :meth:`~Model.write` or :func:`~modelx.write_model`, this property is updated to a `pathlib.Path`_ object representing the path given to the method or function:: >>> model.write(r"C:\Users\mxuser\Model2") >>> model.path WindowsPath('C:/Users/mxuser/Model2') The property is accessed within formulas as an attribute of a special Reference, ``model_``:: >>> @mx.defcells >>> def foo(): ... return _model.path >>> foo() WindowsPath('C:/Users/mxuser/Model') Returns: A `pathlib.Path`_ object or :py:obj:`None` .. versionadded:: 0.25.0 .. _pathlib.Path: https://docs.python.org/3/library/pathlib.html#pathlib.Path """ return self._impl.system.executor.add_reference( self._impl.property_refs["path"]).interface @path.setter def path(self, path): self._impl.clear_attr_referrers(self._impl.property_refs["path"]) self._impl.path = pathlib.Path(path) self._impl.property_refs['path'] = ReferenceImpl( self._impl, "path", self._impl.path, container=self._impl._property_refs, set_item=False)
[docs] def write(self, model_path, backup=True, log_input=False): """Write model to files. This method performs the :py:func:`~modelx.write_model` on self. See :py:func:`~modelx.write_model` section for the details. .. versionchanged:: 0.8.0 .. versionadded:: 0.0.22 Args: model_path(str): Folder(directory) path where the model is saved. backup(bool, optional): Whether to backup an existing file with the same name if it already exists. Defaults to ``True``. log_input(bool, optional): If ``True``, input values in Cells are output to *_input_log.txt* under ``model_path``. Defaults to ``False``. """ from modelx.serialize import write_model write_model(self._impl.system, self, model_path, is_zip=False, backup=backup, log_input=log_input)
[docs] def zip(self, model_path, backup=True, log_input=False, compression=zipfile.ZIP_DEFLATED, compresslevel=None): """Archive model to a zip file. This method performs the :py:func:`~modelx.zip_model` on self. See :py:func:`~modelx.zip_model` section for the details. .. versionchanged:: 0.9.0 ``compression`` and ``compresslevel`` parameters are added. .. versionadded:: 0.8.0 Args: model_path(str): Folder(directory) path where the model is saved. backup(bool, optional): Whether to backup an existing file with the same name if it already exists. Defaults to ``True``. log_input(bool, optional): If ``True``, input values in Cells are output to *_input_log.txt* under ``model_path``. Defaults to ``False``. compression(optional): Identifier of the ZIP compression method to use. This method uses `zipfile.ZipFile`_ class internally and ``compression`` and ``compresslevel`` arguments are passed to `zipfile.ZipFile`_ constructor. See `zipfile.ZipFile`_ manual page for available identifiers. Defaults to `zipfile.ZIP_DEFLATED`_. compresslevel(optional): Integer identifier to indicate the compression level to use. If not specified, the default compression level is used. See `zipfile.ZipFile`_ explanation on the Python Standard Library site for available integer identifiers for each compression method. For Python 3.6, this parameter is ignored. .. _zipfile.ZipFile: https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile .. _zipfile.ZIP_DEFLATED: https://docs.python.org/3/library/zipfile.html#zipfile.ZIP_DEFLATED """ from modelx.serialize import write_model write_model(self._impl.system, self, model_path, is_zip=True, backup=backup, log_input=log_input, compression=compression, compresslevel=compresslevel)
[docs] def export(self, path): """Export the model as a Python package. .. warning:: This feature is experimental. See the limitaions section in :py:func:`~modelx.export_model`. This method performs the :py:func:`~modelx.export_model` on self. See :py:func:`~modelx.export_model` section for the details. .. versionadded:: 0.22.0 """ from ..export.exporter import Exporter Exporter(self, path).export()
# ---------------------------------------------------------------------- # Getting and setting attributes def __getattr__(self, name): return self._impl.get_attr(name) def __delattr__(self, name): self._impl.del_attr(name) def __dir__(self): result = list(self._impl._namespace) result.extend(self._impl._macros.keys()) return result @property def tracegraph(self): """A directed graph of cells.""" return self._impl.tracegraph @property def refs(self): """Return a mapping of global references.""" return RefView(self._impl.global_refs) @property def macros(self): """Return a mapping of macros. Returns a dictionary-like view of all macros defined in the model. Macros are Python functions that can be saved within the model and executed to manipulate or query the model. Example: >>> import modelx as mx >>> m = mx.new_model('MyModel') >>> @mx.defmacro ... def get_name(): ... return mx_model._name >>> m.macros {'get_name': <Macro MyModel.get_name>} >>> m.get_name() 'MyModel' See Also: * :func:`~modelx.defmacro`: Decorator to create macros * :class:`~modelx.core.macro.Macro`: Macro class documentation .. versionadded:: 0.30.0 """ return MacroView(self._impl._macros) def _get_from_name(self, name): """Get object by named id""" return self._impl.get_impl_from_name(name).interface def _get_object(self, name, as_proxy=False): parts = name.split(".") attr = parts.pop(0) if as_proxy and attr in self.refs: return ReferenceProxy(self._impl.global_refs[attr]) else: return super()._get_object(name, as_proxy) def _get_attrdict(self, extattrs=None, recursive=True): """Get attributes""" result = super(Model, self)._get_attrdict(extattrs, recursive) if recursive: result["refs"] = self.refs._get_attrdict(extattrs, recursive) else: result["refs"] = tuple(self.refs) if extattrs: self._get_attrdict_extra(result, extattrs, recursive) return result def _get_refs(self, value): """Get references referring to a value""" refs = self._impl.refmgr._valid_to_refs[id(value)] return [ReferenceProxy(impl) for impl in refs] def _get_assoc_values(self): """Get a list of values in the model with their associates""" result = [] for valid, refs in self._impl.refmgr._valid_to_refs.items(): info = {} info["value"] = refs[0].interface info["spec"] = self._impl.system.iomanager.get_spec_from_value( io_group=self, value=refs[0].interface ) info["refs"] = self._get_refs(info["value"]) result.append(info) return result # ----------------------------------------------------------------------
[docs] def generate_actions(self, targets, step_size=1000): """Generates actions for memory-optimized run Returns a list of *actions* for :meth:`execute_actions` to perform a memory-optimized calculation. See :meth:`execute_actions` for details. Args: targets: :obj:`list` of :class:`~modelx.core.node.ItemNode`. step_size(:obj:`int`, optional): Number of calculations in a step. Returns: :obj:`list` of *actions*. .. seealso:: * :meth:`execute_actions` """ calc_targets = [] calculated = [] try: for n in targets: obj, key = n._impl[OBJ], n._impl[KEY] if key not in obj.input_keys: with self._impl.system.trace_stack(maxlen=None): obj.get_value_from_key(key) tracestack = self._impl.system.callstack.tracestack for trace in tracestack: if trace[0] == "ENTER": calculated.append(trace[3]) calc_targets.append(n._impl) result = self._impl.get_calcsteps( calc_targets, calculated, step_size) finally: for n in calculated: n[OBJ].clear_value_at(n[KEY]) return result
[docs] def execute_actions(self, actions): """Performs memory-optimized run Performs a memory-optimized run. Memory-optimized runs are for calculating specified nodes (*targets*) by consuming less memory. Memory-optimized runs are useful when the intermediate results contain large data. A memory-optimized run actually involves two runs. The first run is invoked by calling :meth:`generate_actions` and the second run is performed by calling :meth:`execute_actions`. The :meth:`generate_actions` method runs the model to generate and return a list of *actions* from a list of *targets* passed to the ``targets`` parameter. The user should set a small data set in the model before calling :meth:`generate_actions`. The elements of ``targets`` should be :class:`~modelx.core.node.ItemNode` objects representing combinations of a :class:`~modelx.core.cells.Cells` object and its arguments. Node objects can be created by passing the arguments to :meth:`~modelx.core.cells.Cells.node` method. For example, the expression below creates a node object representing ``Model1.Space1.Cells3(x=2)``:: Model1.Space1.Cells3.node(x=2) :meth:`generate_actions` runs the model to analyze the dependency of the target nodes. :meth:`generate_actions` identifies all the calculated nodes that the target nodes depend on, and sort the nodes in a topological order. Then the ordered nodes are split into groups so that each group has at most the number of nodes specified by ``step_size`` (1000 by default). Then :meth:`generate_actions` generates actions to process each group. For each group, *calc*, *paste*, and *clear* actions are generated in this order. Each action is associted with nodes that the action applies to. A *calc* action indicates its associated nodes should be calculated. A *paste* action indicates its associted nodes should be value-pasted so that the values of the nodes persist after their precedents are cleared. A *clear* action indicates its associated nodes should be cleared to save memory. :meth:`generate_actions` returns a list of actions. Each action is also represented by a list, whose first element is a string, which is either ``'calc'``, ``'paste'``, or ``'clear'``. The string indicates the type of action to perform. The second element is a list of :class:`~modelx.core.node.ItemNode`, to which the action indicated by the first element apply. Below is an example of the action list. .. code-block:: [ ['calc', [Model1.Space1.Cells1(), Model1.Space1.Cells2(x=0)]], ['paste', [Model1.Space1.Cells2(x=0), Model1.Space1.Cells1()]], ['clear', []], ['calc', [Model1.Space1.Cells2(x=1), Model1.Space1.Cells2(x=2)]], ['paste', [Model1.Space1.Cells2(x=2)]], ['clear', [Model1.Space1.Cells2(x=1), Model1.Space1.Cells2(x=0)]], ['calc', [Model1.Space1.Cells3(x=2)]], ['paste', [Model1.Space1.Cells3(x=2)]], ['clear', [Model1.Space1.Cells1(), Model1.Space1.Cells2(x=2)]] ] :meth:`execute_actions` executes actions passed as ``actions``. Before calling :meth:`execute_actions`, the user should set the entire data set instead of the small data set used for generating the actions. After the execusion, the target nodes are value-pasted, and the values of precedent nodes of the target nodes are all cleared. To clear the values call :meth:`~modelx.core.cells.Cells.clear_at` for the targets or call :meth:`Cells.clear_all<modelx.core.cells.Cells.clear_all>` or :meth:`Space.clear_all<modelx.core.space.UserSpace.clear_all>` or :meth:`Model.clear_all<modelx.core.model.Model.clear_all>`. Args: actions(:obj:`list`): The *actions* list .. seealso:: * :meth:`generate_actions` * `Running a heavy model while saving memory <https://modelx.io/blog/2022/03/26/running-model-while-saving-memory/>`_, a blog post on https://modelx.io """ gc_status = gc.isenabled() gc.disable() try: for step in actions: action, nodes = step if action == "calc": for n in nodes: node = n._impl node[OBJ].get_value_from_key(node[KEY]) elif action == "paste": node_value_pairs = [] for n in nodes: node = n._impl node_value_pairs.append( [node, node[OBJ].get_value_from_key(node[KEY])] ) for node, value in node_value_pairs: node[OBJ].set_value_from_key(node[KEY], value) elif action == "clear": for n in nodes: node = n._impl node[OBJ].clear_value_at(node[KEY]) gc.collect() else: raise RuntimeError("must not happen") finally: if gc_status: gc.enable()
[docs] def compare_cells(self, func): """Tentative: Compare cells with the same name across different spaces in the model. .. warning:: This is a tentative feature based on a use request (See `#196 <https://github.com/fumitoh/modelx/discussions/196>`_), and will be replaced with a more robust solution in future releases. This method searches for cells with a given name across all spaces in the model, groups them by their normalized formula, and displays the results. Args: func: Either a function object or a string representing the cell name to compare. If a function object is provided, its __name__ attribute will be used. Returns: None. Results are printed to stdout. Output Format: Groups cells by their normalized formula and displays: - Group number - List of space names containing cells with identical formulas - The normalized formula (with empty lines removed) Notes: - Empty lines in formulas are removed during normalization - If a cell's formula is not accessible, it will be labeled as "<formula not accessible>" - If no spaces contain a cell with the given name, a message is printed - Formulas are considered identical after normalization (stripping and removing empty lines) Example: .. code-block:: python >>> import modelx as mx >>> m = mx.new_model() >>> s1 = m.new_space() >>> @mx.defcells ... def foo(x): ... return 1 >>> s2 = mx.new_space() >>> @mx.defcells ... def foo(x): ... return 2 >>> m.compare_cells('foo') ------------------------------------------------------------ [Group 1] Spaces : Space1 Formula (normalized): def foo(x): return 1 ------------------------------------------------------------ ------------------------------------------------------------ [Group 2] Spaces : Space2 Formula (normalized): def foo(x): return 2 ------------------------------------------------------------ """ func_name = getattr(func, '__name__', func) grouped = {} try: spaces_iter = self.spaces.values() except AttributeError: spaces_iter = self.spaces for space in spaces_iter: cell = getattr(space, func_name, None) if cell is None: continue try: formula = str(cell.formula).strip() lines = formula.split('\n') normalized_lines = [] for line in lines: if line.strip(): normalized_lines.append(line) normalized_formula = '\n'.join(normalized_lines) except Exception: formula = "<formula not accessible>" normalized_formula = formula grouped.setdefault(normalized_formula, []).append(space.name) if not grouped: print(f"No spaces contain a cell named '{func_name}'.") return for idx, (normalized_formula, names) in enumerate(grouped.items(), 1): print("\n" + "-"*60 + "\n") print(f"[Group {idx}]") print(f"Spaces : {', '.join(sorted(names))}") print("Formula (normalized):") print(normalized_formula) print("\n" + "-"*60 + "\n")
[docs] def new_space_from_model( self, source_model, name=None, *, refs_strategy="copy", refs_prefix=None, defined_only=False, ): """Create a top-level UserSpace from another Model. This method creates a new :class:`~modelx.core.space.UserSpace` in the model and copies all top-level spaces from ``source_model`` into the created space. The space acts as a namespace wrapper, allowing the source model's structure to be migrated within the target model. The method provides control over how references are handled during the migration process through the ``refs_strategy`` parameter, and allows filtering of the copied content through the ``defined_only`` parameter. Args: source_model (:class:`~modelx.core.model.Model`): The model whose top-level spaces will be copied into this model. Cannot be the same as the target model (``self``). name (:obj:`str`, optional): Name of the container space in the target model. If not specified, defaults to ``source_model.name``. refs_strategy (:obj:`str`, optional): Strategy for handling global references from ``source_model``. Must be one of: * ``"copy"`` (default) - Copy all global references from ``source_model`` (except ``__builtins__``) into the container space's references. References are propagated to all descendant spaces. * ``"ignore"`` - Do not copy any references from ``source_model``. Only the spaces and cells structure is copied. * ``"shadow"`` - Copy references from ``source_model`` only if they don't already exist in the target model's global references. Existing references in the target model take precedence. refs_prefix (:obj:`str`, optional): If specified, prefix to apply to reference names when copying from ``source_model``. For example, if ``refs_prefix="src_"``, a reference named ``data`` in ``source_model`` will be copied as ``src_data`` in the container space. The original name is also preserved as an alias. Only applicable when ``refs_strategy`` is ``"copy"`` or ``"shadow"``. defined_only (:obj:`bool`, optional): If :obj:`True`, only defined (non-derived) :class:`~modelx.core.cells.Cells` and :class:`~modelx.core.space.UserSpace` objects are copied. Derived cells and spaces resulting from inheritance are excluded. Defaults to :obj:`False`. Returns: :class:`~modelx.core.space.UserSpace`: The created container space containing all copied spaces from ``source_model``. Raises: ValueError: If ``source_model`` is the same as the target model (``self``). Example: Create two models and embed one into the other:: >>> import modelx as mx >>> # Create source model with spaces and references >>> source = mx.new_model("SourceModel") >>> source.param = 100 >>> space1 = source.new_space("Space1") >>> @mx.defcells ... def calc(x): ... return x * _model.param >>> # Create target model >>> target = mx.new_model("TargetModel") >>> # Embed source model as a space >>> container = target.new_space_from_model(source, name="Embedded") >>> # Access embedded content >>> container.Space1.calc(5) 500 Using ``refs_prefix`` to avoid name conflicts:: >>> target.rate = 0.05 # Global reference in target >>> source.rate = 0.03 # Global reference in source >>> # Embed with prefix to distinguish references >>> container = target.new_space_from_model( ... source, ... name="Embedded", ... refs_prefix="src_" ... ) >>> # References are available with prefix >>> container.src_rate 0.03 >>> target.rate # Target's original reference unchanged 0.05 Copy only defined cells and spaces:: >>> base_space = source.new_space("Base") >>> derived_space = source.new_space("Derived") >>> derived_space.add_bases(base_space) >>> # Copy only defined content (excludes derived) >>> container = target.new_space_from_model( ... source, ... defined_only=True ... ) See Also: * :meth:`~modelx.core.model.Model.new_space` * :meth:`~modelx.core.space.UserSpace.new_space` * :meth:`~modelx.core.space.UserSpace.add_bases` .. versionadded:: 0.29.2 """ if source_model is self: raise ValueError("Cannot embed a model into itself") if name is None: name = source_model.name refs = None if refs_strategy in ("copy", "shadow"): refs = {} for k, ref in source_model._impl.global_refs.items(): if k == "__builtins__": continue new_key = (f"{refs_prefix}{k}" if refs_prefix else k) if refs_strategy == "shadow" and new_key in self._impl.global_refs: continue # Always include the original name as well; prefixed name is an alias if refs_prefix: refs[k] = ref.interface refs[new_key] = ref.interface container_impl = self._impl.model.updater.new_space( parent=self._impl, name=name, refs=refs ) container = container_impl.interface for child in source_model._impl.named_spaces.values(): self._impl.model.updater.copy_space( parent=container_impl, source=child, name=child.name, defined_only=defined_only ) # Propagate container refs into each descendant space so formulas can resolve names if refs_strategy in ("copy", "shadow") and refs: def _propagate(space_impl): # Add missing refs without overriding existing ones for rk, rv in refs.items(): if rk not in space_impl.own_refs: self._impl.model.spmgr.new_ref(space_impl, rk, rv, refmode="auto") for child_impl in space_impl.named_spaces.values(): _propagate(child_impl) _propagate(container_impl) return container
class ModelNamespace(BaseNamespace): __slots__ = () _impl: 'ModelImpl' def __getattr__(self, name): # TODO: Refactor. # Check if name is a Reference in the current Space ref = self._impl.refs.get(name) if ref is not None: assert isinstance(ref, ReferenceImpl) return self._impl.system.executor.add_reference(ref).interface elif name in self._impl.named_spaces: return self._impl.named_spaces[name].interface else: raise AttributeError(f"{name!r} not found in {repr(self._impl.interface)}") def __contains__(self, item): return item in self._impl._namespace @property def _parent(self): return self parent = _parent # for backward compatibility @property def path(self): path = self._impl._property_refs['path'] assert isinstance(path, ReferenceImpl) return self._impl.system.executor.add_reference(path).interface _model_impl_base = ( TraceManager, EditableParentImpl, Impl ) class ModelImpl(*_model_impl_base): interface_cls = Model __slots__ = ( "namespace", "_namespace", "_global_refs", "_property_refs", "_macros", "_macro_namespace", "currentspace", "path", "refmgr" ) + get_mixin_slots(*_model_impl_base) def __init__(self, *, system, name): if not name: name = system._modelnamer.get_next(system.models) elif not is_valid_name(name): raise ValueError("Invalid name '%s'." % name) Impl.__init__(self, system=system, parent=None, name=name, spmgr=SpaceManager(self)) EditableParentImpl.__init__(self) TraceManager.__init__(self) self.currentspace = None self.path = None self._global_refs = {} self._global_refs['__builtins__'] = ReferenceImpl( self, '__builtins__', builtins, container=self._global_refs, set_item=False) self._property_refs = {} self._property_refs["path"] = ReferenceImpl( self, "path", self.path, container=self._property_refs, set_item=False) self._macros = {} self._macro_namespace = None self.named_spaces = {} self._namespace = CustomChainMap(self.named_spaces, self._global_refs) self.namespace = ModelNamespace(self) self.allow_none = False self.refmgr = ReferenceManager(self, system.iomanager) def rename(self, name): """Rename self. Must be called only by its system.""" if is_valid_name(name): if name not in self.system.models: self.name = name return True # Rename success else: # Model name already exists return False else: raise ValueError("Invalid name '%s'." % name) def repr_self(self, add_params=True): return self.name def repr_parent(self): return "" @Impl.doc.setter def doc(self, value): self._doc = value @property def global_refs(self): return self._global_refs refs = global_refs own_refs = global_refs @property def property_refs(self): return self._property_refs def get_impl_from_name(self, name): """Retrieve an object by a dotted name relative to the model.""" parts = name.split(".") space = self.spaces[parts.pop(0)] if parts: return space.get_impl_from_namelist(parts) else: return space def _check_sanity(self): for name, r in self.global_refs.items(): if name != "__builtins__": assert id(r.interface) in self.refmgr._valid_to_refs self.refmgr._check_sanity() self.spmgr._check_sanity() @property def updater(self): return SpaceUpdater(self.spmgr) def del_ref(self, name): ref = self.global_refs[name] self.model.clear_attr_referrers(ref) ref.on_delete() del self.global_refs[name] for space in self.yield_spaces(): space.on_notify(self.global_refs) def change_ref(self, name, value): self.del_ref(name) self.new_ref(name, value) def new_ref(self, name, value): ref = ReferenceImpl( self, name, value, container=self._global_refs, set_item=False) self._global_refs[name] = ref for space in self.yield_spaces(): space.on_notify(self.global_refs) return ref def get_attr(self, name): if name in self.spaces: return self.spaces[name].interface elif name in self.global_refs: return self.global_refs[name].interface elif name in self._macros: return self._macros[name].interface else: raise AttributeError( "Model '{0}' does not have '{1}'".format(self.name, name) ) def set_attr(self, name, value, refmode=None): if name in self.spaces: raise KeyError("Space named '%s' already exist" % self.name) elif name in self._macros: raise KeyError("Macro named '%s' already exists" % name) elif name in self.global_refs: self.refmgr.change_ref(self, name, value) else: self.refmgr.new_ref(self, name, value, refmode) def del_attr(self, name): if name in self.named_spaces: self.updater.del_defined_space(self.named_spaces[name]) elif name in self._macros: self.del_macro(name) elif name in self.global_refs: self.refmgr.del_ref(self, name) else: raise KeyError("Name '%s' not defined" % name) # Macro methods def new_macro(self, name, formula): """Create a new macro Args: name: Name of the macro formula: Formula object or callable Returns: MacroImpl instance """ if not is_valid_name(name): raise ValueError(f"Invalid macro name: {name}") if name in self._macros: raise ValueError(f"Macro '{name}' already exists") if name in self.spaces or name in self.global_refs: raise ValueError(f"Name '{name}' already used") macro = MacroImpl( system=self.system, parent=self, name=name, formula=formula ) self._macros[name] = macro return macro def del_macro(self, name): """Delete a macro Args: name: Name of the macro to delete """ if name not in self._macros: raise KeyError(f"Macro '{name}' not found") macro = self._macros[name] macro.on_delete() del self._macros[name] # Remove from macro namespace if it exists if self._macro_namespace is not None and name in self._macro_namespace: del self._macro_namespace[name] def get_macro_namespace(self): """Get the namespace for macro execution Returns a namespace dict with mx_model and the model's name pointing to the model interface. """ if self._macro_namespace is None: self._macro_namespace = {} # Always update to ensure it has the current state self._macro_namespace['mx_model'] = self.interface self._macro_namespace[self.name] = self.interface self._macro_namespace['__builtins__'] = builtins # Add all macros to the namespace so they can call each other for macro_name, macro_impl in self._macros.items(): self._macro_namespace[macro_name] = macro_impl.interface return self._macro_namespace @property def macros(self): """Return the macros dictionary""" return self._macros def to_node(self): return ObjectNode(get_node(self, None, None)) def split_node(node): parent = ".".join(node.split(".")[:-1]) name = node.split(".")[-1] return parent, name def len_node(node): return len(node.split(".")) def trim_left(node, trimed_len): return ".".join(node.split(".")[trimed_len:]) def trim_right(node, trimed_len): if trimed_len == 0: return node else: return ".".join(node.split(".")[:-trimed_len]) def _get_shared_part(a_node, b_node, from_left=True): a_node = a_node.split(".") b_node = b_node.split(".") length = min(len(a_node), len(b_node)) while length: if from_left: a_node, b_node = a_node[:length], b_node[:length] else: a_node, b_node = a_node[-length:], b_node[-length:] if a_node == b_node: return ".".join(a_node) length -= 1 def get_shared_asc(a_node, b_node): return _get_shared_part(a_node, b_node, from_left=True) def get_shared_desc(a_node, b_node): return _get_shared_part(a_node, b_node, from_left=False) def has_parent(node, parent): parent_len = len_node(parent) if len_node(node) <= parent_len: return False elif trim_right(node, len_node(node) - parent_len) == parent: return True else: return False class SpaceGraph(nx.DiGraph): """New implementation of inheritance graph""" def ordered_preds(self, node): edges = [(self.edges[e]["index"], e) for e in self.in_edges(node)] return [e[0] for i, e in sorted(edges, key=lambda elm: elm[0])] def ordered_subs(self, node): g = nx.descendants(self, node) g.add(node) return nx.topological_sort(self.subgraph(g)) def max_index(self, node): return max( [self.edges[e]["index"] for e in self.in_edges(node)], default=0 ) def get_mro(self, node): """Calculate the Method Resolution Order of bases using the C3 algorithm. Code modified from http://code.activestate.com/recipes/577748-calculate-the-mro-of-a-class/ Args: bases: sequence of direct base spaces. Returns: mro as a list of bases including node itself """ seqs = [self.get_mro(base) for base in self.ordered_preds(node) ] + [self.ordered_preds(node)] res = [] while True: non_empty = list(filter(None, seqs)) if not non_empty: # Nothing left to process, we're done. res.insert(0, node) return res for seq in non_empty: # Find merge candidates among seq heads. candidate = seq[0] not_head = [s for s in non_empty if candidate in s[1:]] if not_head: # Reject the candidate. candidate = None else: break if not candidate: # Better to return None instead of error? raise TypeError( "inconsistent hierarchy, no C3 MRO is possible" ) res.append(candidate) for seq in non_empty: # Remove candidate. if seq[0] == candidate: del seq[0] def _visit_tree_inner(self, node, include_self=True): que = [node] level = 0 while que: n = que.pop(0) if n != node or include_self: yield level, n childs = [ch for ch in self.nodes if ch[:len(n) + 1] == (n + ".") and len_node(n) + 1 == len_node(ch)] que += childs level += 1 def visit_tree(self, node, include_self=True): for _, n in self._visit_tree_inner( node,include_self=include_self): yield n def to_space(self, node): return self.nodes[node]["space"] def get_relative(self, subspace, basespace, basevalue): shared_parent = get_shared_asc(basespace, basevalue) if not shared_parent: return None shared_desc = get_shared_desc(subspace, basespace) if shared_desc: shared_desc = shared_desc.split(".") else: shared_desc = [] subroot = trim_right(subspace, len(shared_desc)) basroot = trim_right(basespace, len(shared_desc)) while True: if basroot in self.get_mro(subroot): break if shared_desc: n = shared_desc.pop(0) subroot = ".".join(subroot.split(".") + [n]) basroot = ".".join(basroot.split(".") + [n]) else: raise RuntimeError("must not happen") if basroot == shared_parent or has_parent(shared_parent, basroot): relative_part = trim_left(basevalue, len_node(basroot)) if relative_part: return subroot + "." + relative_part else: return subroot else: return None class Instruction: def __init__(self, func, args=(), arghook=None, kwargs=None): self.func = func self.args = args self.arghook = arghook self.kwargs = kwargs if kwargs else {} def execute(self): if self.arghook: args, kwargs = self.arghook(self) else: args, kwargs = self.args, self.kwargs return self.func(*args, **kwargs) @property def funcname(self): return self.func.__name__ def __repr__(self): return "<Instruction: %s>" % self.funcname class InstructionList(list): def execute(self, clear=True): result = None for inst in self: result = inst.execute() if clear: self.clear() return result class SharedSpaceOperations: def __init__(self, model): self.model = model self._graph = SpaceGraph() def _can_add(self, parent, name, klass): """Check name conflict for a given name. :obj:`False` if ``name`` is already defined not as an instance of ``klass`` in ``parent`` or in any of ``parent`` descendants. :obj:`False` if ``name`` is already defined as an instance of ``klass`` and ``overwirte`` is :obj:`True`, otherwise :obj:`True`. """ # TODO: Reflect the overwriting order of names if parent is self.model: return name not in parent.namespace else: # parent is a Space child = parent.get_attr(name) if child is not None: return not isinstance(child, Impl) sub = self._find_name_in_subs(parent, name, skip_self=True) # start from parent if sub is None: return True elif isinstance(sub, klass): return True else: return False def _find_name_in_subs(self, parent, name, skip_self=False): for subspace in self._get_subs(parent, skip_self=skip_self): child = subspace.get_attr(name) if child is not None: return child return None def _get_space_bases(self, space, skip_self=True): idx = 1 if skip_self else 0 nodes = self._graph.get_mro(space.idstr)[idx:] return [self._graph.to_space(n) for n in nodes] def get_deriv_bases(self, deriv: Derivable, defined_only=False, graph: SpaceGraph=None): if graph is None: graph = self._graph if isinstance(deriv, UserSpaceImpl): # Not Dynamic spaces return self._get_space_bases(deriv, graph) pnode = deriv.parent.idstr bases = [] for bspace in graph.get_mro(pnode)[1:]: base_members = deriv._get_members(graph.to_space(bspace)) if deriv.name in base_members: b = base_members[deriv.name] if not defined_only or b.is_defined(): bases.append(b) return bases def get_direct_bases(self, space): node = space.idstr preds = self._graph.ordered_preds(node) return [self._graph.to_space(n) for n in preds] def update_subs(self, space, skip_self=True): for attr in ("cells", "own_refs"): for s in self._get_subs(space, skip_self): b = self._get_space_bases(s, self._graph) s.on_inherit(self, b, attr) def _get_subs(self, space, skip_self=True): idx = 1 if skip_self else 0 return [ self._graph.to_space(desc) for desc in list( self._graph.ordered_subs(space.idstr))[idx:] ] def get_relative_interface(self, parent, base): basespace = base.parent.idstr basevalue = base.interface._impl.idstr subimpl = self._graph.get_relative( parent.idstr, basespace, basevalue) if subimpl: impl = self.model.get_impl_from_name(subimpl) if impl: return True, impl.interface else: return True, base.interface._impl.interface_cls(null_impl) else: return False, base.interface class SpaceManager(SharedSpaceOperations): def rename_space(self, space, name): # Check name does not exit already parent = space.parent if not self._can_add( parent, name, UserSpaceImpl): raise ValueError("Cannot rename '%s' to '%s'" % (space.name, name)) # Create name mapping mapping = {} old_id = tuple(space.idstr.split(".")) new_id = old_id[:-1] + (name,) for node in self._graph.visit_tree( space.idstr, include_self=True): old_child = tuple(node.split(".")) assert old_id == old_child[:len(old_id)] mapping[node] = ".".join(new_id + old_child[len(new_id):]) if not space.parent.is_model(): # Clear parent's dynsub, not s's space.parent.clear_subs_rootitems() # Call on_rename callbacks space.on_rename(name) # Rename nodes nx.relabel_nodes(self._graph, mapping, copy=False) def del_cells(self, space, name): cells = space.cells[name] if cells.is_derived(): raise ValueError("cannot delete derived") space.on_del_cells(name) self.update_subs(space, skip_self=False) def del_ref(self, space, name): space.on_del_ref(name) self.update_subs(space, skip_self=False) def new_cells(self, space, name=None, formula=None, data=None, is_derived=False, is_cached=True, edit_source=True): # FIX: Creating a Cells of the same name in ``space`` if not self._can_add(space, name, CellsImpl): raise ValueError("Cannot create cells '%s'" % name) cells = UserCellsImpl( space=space, name=name, formula=formula, data=data, is_derived=is_derived, is_cached=is_cached, edit_source=edit_source) space.clear_subs_rootitems() name = cells.name # If name is none, auto-named in __init__ for subspace in self._get_subs(space): if name in subspace.cells: continue else: subspace.clear_subs_rootitems() derived = UserCellsImpl( space=subspace, base=cells, is_derived=True, add_to_space=False, is_cached=is_cached ) base_cells = {} for b in reversed(subspace.bases): base_cells.update(b.cells) idx = list(base_cells).index(name) cells_after = list(subspace.cells)[idx:] subspace.cells[name] = derived subspace.on_notify(subspace.cells) for k in cells_after: subspace.cells[k] = subspace.cells.pop(k) return cells def copy_cells(self, space: UserSpaceImpl, source: UserCellsImpl, name=None): """``space`` can be of another Model""" if space.model is not self.model: return space.spmgr.copy_cells(space, source, name) if name is None: name = source.name data = {k: v for k, v in source.data.items() if k in source.input_keys} return self.new_cells(space, name=name, formula=source.formula, data=data, is_derived=False) def rename_cells(self, cells, name): """Renames the Cells name""" if not is_valid_name(name): raise ValueError("name '%s' is invalid" % name) if not self._can_add(cells.parent, name, CellsImpl): raise ValueError("cannot create cells '%s'" % name) if cells.bases: raise ValueError("'%s' is a sub Cells of '%s'" % ( cells.get_repr(fullname=True, add_params=False), cells.bases[0].get_repr(fullname=True, add_params=False))) old_name = cells.name for space in self._get_subs(cells.parent, skip_self=False): space.clear_subs_rootitems() space.cells[old_name].on_rename(name) def sort_cells(self, space): """Sort cells in a space - Applies only to defined UserSpaces - Only cells defined in the space (neither derived/overridden) are sorted and placed before the derived/overridden cells. - Derived/overridden cells in the sub spaces are also sorted. """ for subspace in self._get_subs(space, skip_self=False): subspace.on_sort_cells(space=space) def set_cells_property(self, cells, flags, func, enable_cache): """Set formula and/or is_enabled""" define = True for space in self._get_subs(cells.parent, skip_self=False): c = space.cells[cells.name] if (c is not cells and c.is_defined() and self.get_deriv_bases(c, defined_only=True)[0] is cells): continue # Skip when c's base is not cells space.clear_subs_rootitems() space.cells[cells.name].on_set_property( flags, define, func, enable_cache ) define = False # Do not define derived cells def set_cells_formula(self, cells, func): self.set_cells_property(cells, UserCellsImpl.PROP_FORMULA, func, True) def set_cache(self, cells, enable_cache): self.set_cells_property(cells, UserCellsImpl.PROP_CACHE, None, enable_cache) def del_cells_formula(self, cells): self.set_cells_formula(cells, NULL_FORMULA) def _check_subs_relrefs(self, space, name, value, refmode): # Check if relative ref is possible when refmode is 'relative' if isinstance(value, Interface) and refmode == "relative": basevalue = value._impl.idstr for subspace in self._get_subs(space): if name in subspace.own_refs: break else: subvalue = self._graph.get_relative( subspace.idstr, space.idstr, basevalue) if not subvalue: raise ValueError( "Cannot create relative reference for '%s' in '%s'" % (basevalue, subspace.idstr) ) def new_ref(self, space, name, value, refmode): other = self._find_name_in_subs(space, name) if other is not None: if not isinstance(other, ReferenceImpl): raise ValueError("Cannot create reference '%s'" % name) elif other not in self.model.global_refs.values(): raise ValueError("Cannot create reference '%s'" % name) self._check_subs_relrefs(space, name, value, refmode) result = space.on_create_ref(name, value, is_derived=False, refmode=refmode) for subspace in self._get_subs(space): is_relative = False if name in subspace.own_refs: break if isinstance(value, Interface) and value._is_valid(): if refmode == "auto" or refmode == "relative": is_relative, value = self.get_relative_interface( subspace, space.own_refs[name]) ref = subspace.on_create_ref(name, value, is_derived=True, refmode=refmode) ref.is_relative = is_relative return result def change_ref(self, space, name, value, refmode): """Assigns a new value to an existing name.""" self._check_subs_relrefs(space, name, value, refmode) # self._set_defined(space.idstr) # space.set_defined() is_relative = False if refmode == "absolute" else True space.on_change_ref(name, value, is_derived=False, refmode=refmode, is_relative=is_relative) for subspace in self._get_subs(space): is_relative = False subref = subspace.own_refs[name] if subref.is_defined(): break elif subref.defined_bases[0] is not space.own_refs[name]: break if isinstance(value, Interface) and value._is_valid(): if (refmode == "auto" or refmode == "relative"): is_relative, value = self.get_relative_interface( subspace, space.own_refs[name]) ref = subspace.on_change_ref(name, value, is_derived=True, refmode=refmode, is_relative=is_relative) ref.is_relative = is_relative def _check_sanity(self): nodes = set(self._graph.nodes) spaces = dict(self.model.named_spaces) # consistency between spaces and nodes while spaces: k, v = spaces.popitem() assert k == v.name assert v.idstr in nodes assert v is self._graph.nodes[v.idstr]["space"] nodes.remove(v.idstr) spaces.update(v.named_spaces) assert not nodes # Check all nodes are reached class SpaceUpdater(SharedSpaceOperations): def __init__(self, manager): self.manager = manager super().__init__(manager.model) self._instructions = InstructionList() self._graph = self.manager._graph.copy() def _update_manager(self): self.manager._graph = self._graph def _update_derived_space(self, node): space = self._graph.to_space(node) bases = self._get_space_bases(space, self._graph) space.on_inherit(self, bases, 'cells') self._instructions.append( Instruction(self._update_derived_refs, (node,)) ) def _update_derived_refs(self, node): space = self._graph.to_space(node) bases = self._get_space_bases(space, self._graph) space.on_inherit(self, bases, 'own_refs') def _remove_hook(self, graph, node): parent_node, name = split_node(node) if parent_node in self.manager._graph: parent = self.manager._graph.to_space(parent_node) elif parent_node: parent = graph.to_space(parent_node) else: parent = self.model method = parent.on_del_space self._instructions.append( Instruction(method, (name,)) ) def new_space( self, parent, name=None, bases=None, formula=None, refs=None, source=None, prefix="", doc=None ): """Create a new child space. Args: name (str): Name of the space. If omitted, the space is created automatically. bases: If specified, the new space becomes a derived space of the `base` space. formula: Function whose parameters used to set space parameters. refs: a mapping of refs to be added. source: A source module from which cell definitions are read. prefix: Prefix to the autogenerated name when name is None. """ if name is None: while True: name = parent.spacenamer.get_next(parent.namespace, prefix) if self.manager._can_add(parent, name, UserSpaceImpl): break elif not self.manager._can_add(parent, name, UserSpaceImpl): raise ValueError("Cannot create space '%s'" % name) if not prefix and not is_valid_name(name): raise ValueError("Invalid name '%s'." % name) if bases is None: bases = [] elif isinstance(bases, UserSpaceImpl): bases = [bases] node = name if parent.is_model() else parent.idstr + "." + name spaces = [s for s in bases] if not parent.is_model(): spaces.insert(0, parent) self._graph.add_node(node) for b in bases: base = b.idstr self._graph.add_edge( base, node, level=0, index=self._graph.max_index(node) + 1 ) if not nx.is_directed_acyclic_graph(self._graph): raise ValueError("cyclic inheritance") self._graph.get_mro(node) # Check if MRO is possible # Check if MRO is possible for each node in sub graph for n in nx.descendants(self._graph, node): self._graph.get_mro(n) space = UserSpaceImpl( parent, name, container=parent.named_spaces, formula=formula, refs=refs, source=source, doc=doc ) if isinstance(parent, NamespaceServer): parent.on_notify(parent.named_spaces) # Fix: bug GH203 self._graph.nodes[node]["space"] = space self._graph.nodes[node]["state"] = "created" self._instructions.append( Instruction(self._update_derived_space, (node,))) for _, v in nx.edge_dfs(self._graph, node): self._instructions.append( Instruction(self._update_derived_space, (v,))) try: self._instructions.execute() except BaseException: del parent.named_spaces[name] raise self._update_manager() return space def add_bases(self, space, bases): """Add bases to space in graph """ node = space.idstr basenodes = [base.idstr for base in bases] for base in [node] + basenodes: if base not in self.manager._graph: raise ValueError("Space '%s' not found" % base) for b in basenodes: self._graph.add_edge( b, node, level=0, index=self._graph.max_index(node) + 1 ) if not nx.is_directed_acyclic_graph(self._graph): raise ValueError("cyclic inheritance") for n in itertools.chain({node}, nx.descendants( self._graph, node)): self._graph.get_mro(n) for desc in itertools.chain( {node}, nx.descendants(self._graph, node)): mro = self._graph.get_mro(desc) # Check name conflict between spaces, cells, refs members = {} for attr in ["spaces", "cells", "refs"]: namechain = [] for sname in mro: space = self._graph.to_space(sname) namechain.append(set(getattr(space, attr).keys())) members[attr] = set().union(*namechain) conflict = set().intersection(*[n for n in members.values()]) if conflict: raise NameError("name conflict: %s" % conflict) self._instructions.append( Instruction(self._update_derived_space, (node,))) for _, v in nx.edge_dfs(self._graph, node): self._instructions.append( Instruction(self._update_derived_space, (v,))) self._instructions.execute() self._update_manager() def remove_bases(self, space, bases): node = space.idstr basenodes = [base.idstr for base in bases] for base in [node] + basenodes: if base not in self.manager._graph: raise ValueError("Space '%s' not found" % base) for b in basenodes: self._graph.remove_edge(b, node) self._instructions.append( Instruction(self._update_derived_space, (node,)) ) for _, v in nx.edge_bfs(self.manager._graph, node): self._instructions.append( Instruction(self._update_derived_space, (v,)) ) self._instructions.execute() self._update_manager() def del_defined_space(self, space): node = space.idstr if node not in self.manager._graph: raise ValueError("Space '%s' not found" % node) # Remove node and its child tree nodes_removed = list() for child in self._graph.visit_tree(node): nodes_removed.append(child) self._remove_hook(self._graph, child) for _, v in nx.edge_bfs(self.manager._graph, node): self._instructions.append( Instruction(self._update_derived_space, (v,)) ) self._graph.remove_nodes_from(nodes_removed) self._instructions.execute() self._update_manager() if space is self.model.currentspace: self.model.currentspace = None def copy_space( self, parent: EditableParentImpl, source: UserSpaceImpl, name=None, defined_only=False ): if parent.has_ascendant(source): raise ValueError("Cannot copy to child") if parent.model is not self.model: return parent.model.updater.copy_space( parent, source, name, defined_only) if name is None: name = source.name if self.manager._can_add( parent, name, EditableParentImpl): return self._copy_space_recursively( parent, source, name, defined_only ) else: raise ValueError("Cannot create space '%s'" % name) def _copy_space_recursively( self, parent, source, name, defined_only): space = self.new_space( parent, name=name, bases=None, formula=source.formula, refs={k: v.interface for k, v in source.own_refs.items()}, source=source.source, prefix="", doc=source.doc ) for cells in source.cells.values(): if cells.is_defined(): self.manager.copy_cells(space, cells) for child in source.named_spaces.values(): self._copy_space_recursively( space, child, child.name, defined_only) return space class ReferenceManager: def __init__(self, model, iomanager): self._model = model self._manager = iomanager self._valid_to_refs = {} # id(value) -> [refs] def _check_sanity(self): for refs in self._valid_to_refs.values(): for r in refs: spec = self._manager.get_spec_from_value( io_group=self._model.interface, value=r.interface) if spec is not None: assert r.interface is spec.value spec._check_sanity() def has_spec(self, value): spec = self._manager.get_spec_from_value(self._model.interface, value) return spec is not None def get_spec(self, value): return self._manager.get_spec_from_value(self._model.interface, value) @property def values(self): return list(ref[0].interface for ref in self._valid_to_refs.values()) @property def specs(self): result = [] for r in self._valid_to_refs.values(): spec = self.get_spec(r[0].interface) if spec is not None: result.append(spec) return result def new_ref(self, impl, name, value, refmode): if isinstance(impl, ModelImpl): ref = impl.new_ref(name, value) elif isinstance(impl, UserSpaceImpl): ref = impl.model.spmgr.new_ref(impl, name, value, refmode) else: raise RuntimeError("must not happen") if not isinstance(value, Interface): refs = self._valid_to_refs.setdefault(id(value), []) assert all(ref is not r for r in refs) refs.append(ref) def del_ref(self, impl, name): refdict = impl.own_refs ref = refdict[name] valid = id(ref.interface) val = ref.interface if isinstance(impl, ModelImpl): impl.del_ref(name) elif isinstance(impl, UserSpaceImpl): impl.model.spmgr.del_ref(impl, name) else: raise RuntimeError("must not happen") if not isinstance(val, Interface): refs = self._valid_to_refs.get(valid) assert refs refs.remove(ref) if not refs: del self._valid_to_refs[valid] spec = self._manager.get_spec_from_value( io_group=self._model.interface, value=val ) if spec: self._manager.del_spec(spec) def change_ref(self, impl, name, value, refmode=None): refdict = impl.own_refs prev_ref = refdict[name] prev_valid = id(prev_ref.interface) prev_val = prev_ref.interface if isinstance(impl, ModelImpl): impl.model.change_ref(name, value) elif isinstance(impl, UserSpaceImpl): impl.model.spmgr.change_ref(impl, name, value, refmode) else: raise RuntimeError("must not happen") refs = self._valid_to_refs.get(prev_valid, None) if refs is not None: # None in case prev_ref is derived if prev_ref in refs: refs.remove(prev_ref) if not refs: # ref is empty del self._valid_to_refs[prev_valid] spec = self._manager.get_spec_from_value(self._model.interface, prev_val) if spec: self._manager.del_spec(spec) if not isinstance(value, Interface): self._valid_to_refs.setdefault(id(value), []).append(refdict[name]) def del_all_spec(self): specs = self.specs.copy() while specs: self._manager.del_spec(specs.pop()) def update_value(self, old_value, new_value=None, **kwargs): prev_id = id(old_value) refs = self._valid_to_refs.get(prev_id, None) spec = self._manager.get_spec_from_value(self._model.interface, old_value) if refs is None: raise ValueError("value not referenced") if new_value is None: new_value = old_value if spec is not None: self._manager.update_spec_value(spec, new_value, kwargs) new_value = spec.value newrefs = [] while refs: ref = refs.pop() impl = ref.parent name = ref.name refmode = ref.refmode value = new_value self._impl_change_ref(impl, name, value, refmode) newrefs.append(impl.own_refs[name]) self._valid_to_refs.pop(prev_id) self._valid_to_refs[id(new_value)] = newrefs @staticmethod def _impl_change_ref(impl, name, value, *refmode): if isinstance(impl, ModelImpl): impl.model.change_ref(name, value) elif isinstance(impl, UserSpaceImpl): impl.model.spmgr.change_ref(impl, name, value, refmode) else: raise RuntimeError("must not happen")