# Copyright (c) 2017-2022 Fumito Hamamura <fumito.ham@gmail.com>
# This library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation version 3.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import builtins
import itertools
import zipfile
import gc
from types import ModuleType
import networkx as nx
from modelx.core.base import (
add_stateattrs,
Interface,
Impl,
get_interfaces,
ImplChainMap,
BaseView,
Derivable,
get_mixin_slots
)
from modelx.core.reference import ReferenceImpl, ReferenceProxy
from modelx.core.cells import CellsImpl, UserCellsImpl
from modelx.core.node import OBJ, KEY, get_node, node_has_key, ItemNode
from modelx.core.parent import (
BaseParentImpl,
EditableParentImpl,
EditableParent,
)
from modelx.core.space import (
UserSpaceImpl,
SpaceDict,
SpaceView,
RefDict
)
from modelx.core.formula import NULL_FORMULA
from modelx.core.util import is_valid_name, AutoNamer
from modelx.core.chainmap import CustomChainMap
try:
_nxver = tuple(int(n) for n in nx.__version__.split(".")[:2])
except ValueError: # in such case as '2.6rc1'
_major, _minor = nx.__version__.split(".")[:2]
_nxver = (int(_major), int(_minor[0]))
class TraceGraph(nx.DiGraph):
"""Directed Graph of ObjectArgs"""
def remove_with_descs(self, source):
"""Remove all descendants of(reachable from) `source`.
Args:
source: Node descendants
Returns:
set: The removed nodes.
"""
if not self.has_node(source):
return set()
desc = nx.descendants(self, source)
desc.add(source)
self.remove_nodes_from(desc)
return desc
def clear_obj(self, obj):
"""Remove all nodes with `obj` and their descendants."""
obj_nodes = self.get_nodes_with(obj)
removed = set()
for node in obj_nodes:
if self.has_node(node):
removed.update(self.remove_with_descs(node))
return removed
def get_nodes_with(self, obj):
"""Return nodes with `obj`."""
result = set()
if nx.__version__[0] == "1":
nodes = self.nodes_iter()
else:
nodes = self.nodes
for node in nodes:
if node[OBJ] == obj:
result.add(node)
return result
def get_startnodes_from(self, node):
if node in self:
return [n for n in nx.descendants(self, node)
if self.out_degree(n) == 0]
else:
return []
def fresh_copy(self):
"""Overriding Graph.fresh_copy"""
return TraceGraph()
def add_path(self, nodes, **attr):
"""(Not used anymore) In replacement for Deprecated add_path method"""
if nx.__version__[0] == "1":
return super().add_path(nodes, **attr)
else:
return nx.add_path(self, nodes, **attr)
class ReferenceGraph(nx.DiGraph):
def remove_with_descs(self, ref):
if ref not in self:
return set()
desc = nx.descendants(self, ref)
self.remove_nodes_from((ref, *desc))
return desc # Not including ref
class IOSpecOperation:
__slots__ = ()
def update_pandas(self, old_data, new_data=None):
"""Update a pandas object assigned to References
Replace with ``new_data`` the value of such a Reference whose value is
``old_data``. Both ``new_data`` and ``old_data`` need to be
`DataFrame`_ or `Series`_.
If ``old_data`` is assigned to multiple References in a model,
the values of all the References are replaced with ``new_data``,
even the References
are defined in different locations within the model.
The identity of pandas objects is determined by the `id()`_ function.
If ``new_data`` is not given, :class:`~modelx.core.cells.Cells`
that are dependent on the References are cleared.
If ``old_data`` has an associated
:class:`~modelx.io.pandasio.PandasData`,
this method associates the :class:`~modelx.io.pandasio.PandasData`
to ``new_data``.
This method is available for :class:`~modelx.core.model.Model`
and :class:`~modelx.core.space.UserSpace`. The method
performs identically regardless of the types of calling objects.
.. _id():
https://docs.python.org/3/library/functions.html#id
.. _Series:
https://pandas.pydata.org/docs/reference/api/pandas.Series.html
.. _DataFrame:
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html
Args:
new_data: A pandas `Series`_ or `DataFrame`_ object
old_data(optional): A pandas `Series`_ or `DataFrame`_ object
.. versionadded:: 0.18.0
See Also:
* :meth:`new_pandas`
* :class:`~modelx.io.pandasio.PandasData`
"""
return self._impl.model.refmgr.update_value(old_data, new_data)
def update_module(self, old_module, new_module=None):
"""Update an user-defined module assigned to References
Update an user-defined Python module created by :meth:`new_module`.
The ``new_module`` parameter is a path to the source file
of a new user-defined module.
If ``new_module`` is not given, the old module is reloaded
from the same source file of the old module
and a new module module is created.
The values of References referring to the old module object
are replaced with the new module object.
If ``old_module`` is assigned to multiple References in a model,
the value of all the References are updated, even the References
are defined in different locations within the model.
This method associates to the new module the
:class:`~modelx.io.moduleio.ModuleData` object previously
associated to the old module.
This method is available for :class:`~modelx.core.model.Model`
and :class:`~modelx.core.space.UserSpace`. The method
performs identically regardless of the types of calling objects.
Args:
old_module: A user-defined Python module object.
new_module: The path to the source file as a :obj:`str` or
a path-like object.
.. versionadded:: 0.18.0
See Also:
* :meth:`new_module`
* :class:`~modelx.io.moduleio.ModuleData`
"""
if not isinstance(old_module, ModuleType):
raise ValueError("not a module object")
return self._impl.model.refmgr.update_value(old_module, new_module)
def get_spec(self, data):
"""Get *IOSpec* associated with ``data``
Returns the *IOSpec* object associated with ``data``.
``data`` should be an object referenced in the model.
An *IOSpec* object is an instance of a subclass of
:class:`~modelx.io.baseio.BaseIOSpec`.
If no *IOSpec* is associated with `data`, an error is raised.
See Also:
* :meth:`~modelx.core.model.Model.del_spec`
* :class:`~modelx.io.baseio.BaseIOSpec`
* :attr:`~modelx.core.model.Model.iospecs`
"""
spec = self._impl.refmgr.get_spec(data)
if spec is None:
raise ValueError("spec not found")
else:
return spec
def del_spec(self, data):
"""Delete *IOSpec* associate with ``data``
Deletes the *IOSpec* object associated with ``data``.
``data`` should be an object referenced in the model.
An *IOSpec* object is an instance of a subclass of
:class:`~modelx.io.baseio.BaseIOSpec`.
See Also:
* :meth:`~modelx.core.model.Model.get_spec`
* :class:`~modelx.io.baseio.BaseIOSpec`
* :attr:`~modelx.core.model.Model.iospecs`
"""
self._impl.refmgr._manager.del_spec(self.get_spec(data))
@property
def iospecs(self):
"""List of :class:`~modelx.io.baseio.BaseIOSpec` objects
Returns a list of all objects of BaseIOSpec subclasses
defined in this Model.
:class:`~modelx.io.excelio.ExcelRange` and
:class:`~modelx.io.pandasio.PandasData`
are subclasses of :class:`~modelx.io.baseio.BaseIOSpec`.
:class:`~modelx.io.excelio.ExcelRange`
objects are created either by
:meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>`
or
:meth:`UserSpace.new_excel_range<modelx.core.space.UserSpace.new_excel_range>`
method.
:class:`~modelx.io.pandasio.PandasData` objects are
created either by
:meth:`Model.new_pandas<modelx.core.model.Model.new_pandas>`
or
:meth:`UserSpace.new_pandas<modelx.core.space.UserSpace.new_pandas>`
method.
See Also:
* :meth:`~modelx.core.model.Model.get_spec`
* :class:`~modelx.io.excelio.ExcelRange`
* :class:`~modelx.io.pandasio.PandasData`
* :meth:`UserSpace.new_excel_range<modelx.core.space.UserSpace.new_excel_range>`
* :meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>`
* :meth:`UserSpace.new_pandas<modelx.core.space.UserSpace.new_pandas>`
* :meth:`Model.new_pandas<modelx.core.model.Model.new_pandas>`
.. versionchanged:: 0.20.0 renamed to ``iospecs`` from ``dataspecs``
.. versionchanged:: 0.18.0 the property name is changed
from ``dataclients`` to ``dataspecs``
.. versionadded:: 0.9.0
"""
return list(self._impl.refmgr.specs)
[docs]class Model(IOSpecOperation, EditableParent):
"""Top-level container in modelx object hierarchy.
Model instances are the top-level objects and directly contain
:py:class:`~modelx.core.space.UserSpace` objects, which in turn
contain other spaces or
:py:class:`~modelx.core.cells.Cells` objects.
A model can be created by
:py:func:`~modelx.new_model` API function.
"""
__slots__ = ()
[docs] def rename(self, name, rename_old=False):
"""Rename the model itself"""
self._impl.system.rename_model(
new_name=name, old_name=self.name, rename_old=rename_old)
[docs] def clear_all(self):
"""Clears :class:`~modelx.core.cells.Cells` and :class:`~modelx.core.space.ItemSpace`.
Clears both the input values and the calculated values of
all the :class:`~modelx.core.cells.Cells` in the model and
delete all the :class:`~modelx.core.space.ItemSpace` objects
in the model.
.. seealso::
:meth:`UserSpace.clear_all<modelx.core.space.UserSpace.clear_all>`
.. versionadded:: 0.16.0
"""
for space in self._impl.spaces.values():
space.clear_all_cells(
clear_input=True,
recursive=True,
del_items=True
)
[docs] def save(self, filepath, datapath=None):
"""Back up the model to a file.
.. deprecated:: 0.9.0 Use :meth:`backup` instead.
Alias for :meth:`backup`. See :meth:`backup` for details.
"""
self._impl.system.backup_model(self, filepath, datapath)
[docs] def backup(self, filepath, datapath=None):
"""Back up the model to a file.
Backup the model to a single binary file. This method internally
utilizes Python's standard library,
`pickle <https://docs.python.org/3/library/pickle.html>`_.
This method should only be used for saving the model temporarily,
as the saved model may not be restored by different
versions of modelx, or when the Python environment changes,
for example, due to package upgrade.
Saving the model by :meth:`write` method is more robust.
.. deprecated:: 0.18.0 Use :meth:`write` or :meth:`zip` instead.
.. versionchanged:: 0.9.0 ``datapath`` parameter is added.
.. versionadded:: 0.7.0
Args:
filepath(str): file path
datapath(optional): Path to a folder to store internal files.
See Also:
:meth:`write`
:func:`~modelx.restore_model`
"""
self._impl.system.backup_model(self, filepath, datapath)
[docs] def close(self):
"""Close the model."""
self._impl.close()
@Interface.doc.setter
def doc(self, value):
self._impl.doc = value
[docs] def write(self, model_path, backup=True, log_input=False):
"""Write model to files.
This method performs the :py:func:`~modelx.write_model`
on self. See :py:func:`~modelx.write_model` section for the details.
.. versionchanged:: 0.8.0
.. versionadded:: 0.0.22
Args:
model_path(str): Folder(directory) path where the model is saved.
backup(bool, optional): Whether to backup an existing file with
the same name if it already exists. Defaults to ``True``.
log_input(bool, optional): If ``True``, input values in Cells are
output to *_input_log.txt* under ``model_path``. Defaults
to ``False``.
"""
from modelx.serialize import write_model
write_model(self._impl.system, self, model_path, is_zip=False,
backup=backup, log_input=log_input)
[docs] def zip(self, model_path, backup=True, log_input=False,
compression=zipfile.ZIP_DEFLATED, compresslevel=None):
"""Archive model to a zip file.
This method performs the :py:func:`~modelx.zip_model`
on self. See :py:func:`~modelx.zip_model` section for the details.
.. versionchanged:: 0.9.0
``compression`` and ``compresslevel`` parameters are added.
.. versionadded:: 0.8.0
Args:
model_path(str): Folder(directory) path where the model is saved.
backup(bool, optional): Whether to backup an existing file with
the same name if it already exists. Defaults to ``True``.
log_input(bool, optional): If ``True``, input values in Cells are
output to *_input_log.txt* under ``model_path``. Defaults
to ``False``.
compression(optional): Identifier of the ZIP compression method
to use. This method uses `zipfile.ZipFile`_ class internally
and ``compression`` and ``compresslevel`` arguments are
passed to `zipfile.ZipFile`_ constructor.
See `zipfile.ZipFile`_ manual page for available identifiers.
Defaults to `zipfile.ZIP_DEFLATED`_.
compresslevel(optional):
Integer identifier to indicate the compression level to use.
If not specified, the default compression level is used.
See `zipfile.ZipFile`_ explanation on the Python Standard
Library site for available integer identifiers for
each compression method.
For Python 3.6, this parameter is ignored.
.. _zipfile.ZipFile:
https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile
.. _zipfile.ZIP_DEFLATED:
https://docs.python.org/3/library/zipfile.html#zipfile.ZIP_DEFLATED
"""
from modelx.serialize import write_model
write_model(self._impl.system, self, model_path, is_zip=True,
backup=backup, log_input=log_input,
compression=compression, compresslevel=compresslevel)
# ----------------------------------------------------------------------
# Getting and setting attributes
def __getattr__(self, name):
return self._impl.get_attr(name)
def __delattr__(self, name):
self._impl.del_attr(name)
def __dir__(self):
return self._impl.namespace.interfaces
@property
def tracegraph(self):
"""A directed graph of cells."""
return self._impl.tracegraph
@property
def refs(self):
"""Return a mapping of global references."""
return self._impl.global_refs.interfaces
def _get_from_name(self, name):
"""Get object by named id"""
return self._impl.get_impl_from_name(name).interface
def _get_object(self, name, as_proxy=False):
parts = name.split(".")
attr = parts.pop(0)
if as_proxy and attr in self.refs:
return ReferenceProxy(self._impl.global_refs[attr])
else:
return super()._get_object(name, as_proxy)
def _get_attrdict(self, extattrs=None, recursive=True):
"""Get attributes"""
result = super(Model, self)._get_attrdict(extattrs, recursive)
if recursive:
result["refs"] = self.refs._get_attrdict(extattrs, recursive)
else:
result["refs"] = tuple(self.refs)
if extattrs:
self._get_attrdict_extra(result, extattrs, recursive)
return result
def _get_refs(self, value):
"""Get references referring to a value"""
refs = self._impl.refmgr._valid_to_refs[id(value)]
return [ReferenceProxy(impl) for impl in refs]
def _get_assoc_values(self):
"""Get a list of values in the model with their associates"""
result = []
for valid, refs in self._impl.refmgr._valid_to_refs.items():
info = {}
info["value"] = refs[0].interface
info["spec"] = self._impl.system.iomanager.get_spec_from_value(
io_group=self,
value=refs[0].interface
)
info["refs"] = self._get_refs(info["value"])
result.append(info)
return result
# ----------------------------------------------------------------------
[docs] def generate_actions(self, targets, step_size=1000):
"""Generates actions for memory-optimized run
Returns a list of *actions* for :meth:`execute_actions`
to perform a memory-optimized calculation.
See :meth:`execute_actions` for details.
Args:
targets: :obj:`list` of :class:`~modelx.core.node.ItemNode`.
step_size(:obj:`int`, optional): Number of calculations in a step.
Returns:
:obj:`list` of *actions*.
.. seealso::
* :meth:`execute_actions`
"""
calc_targets = []
calculated = []
try:
for n in targets:
obj, key = n._impl[OBJ], n._impl[KEY]
if key not in obj.input_keys:
with self._impl.system.trace_stack(maxlen=None):
obj.get_value_from_key(key)
tracestack = self._impl.system.callstack.tracestack
for trace in tracestack:
if trace[0] == "ENTER":
calculated.append(trace[3])
calc_targets.append(n._impl)
result = self._impl.get_calcsteps(
calc_targets, calculated, step_size)
finally:
for n in calculated:
n[OBJ].clear_value_at(n[KEY])
return result
[docs] def execute_actions(self, actions):
"""Performs memory-optimized run
Performs a memory-optimized run.
Memory-optimized runs are for calculating specified nodes (*targets*)
by consuming less memory.
Memory-optimized runs are useful when
the intermediate results contain large data.
A memory-optimized run actually involves two runs.
The first run is invoked by calling
:meth:`generate_actions` and
the second run is performed by calling :meth:`execute_actions`.
The :meth:`generate_actions` method runs the model to generate
and return
a list of *actions* from a list of *targets* passed to the
``targets`` parameter.
The user should set a small data set in the model
before calling :meth:`generate_actions`.
The elements of ``targets`` should be
:class:`~modelx.core.node.ItemNode` objects representing
combinations of a :class:`~modelx.core.cells.Cells` object
and its arguments.
Node objects can be created by passing the arguments
to :meth:`~modelx.core.cells.Cells.node` method.
For example, the expression below creates a node object representing
``Model1.Space1.Cells3(x=2)``::
Model1.Space1.Cells3.node(x=2)
:meth:`generate_actions` runs the model to analyze
the dependency of the target nodes.
:meth:`generate_actions` identifies all the calculated nodes
that the target nodes depend on, and
sort the nodes in a topological order.
Then the ordered nodes are split into groups so that
each group has at most the number of nodes specified by
``step_size`` (1000 by default).
Then :meth:`generate_actions` generates actions
to process each group.
For each group, *calc*, *paste*, and *clear* actions are generated in this order.
Each action is associted with nodes that the action applies to.
A *calc* action indicates its associated nodes
should be calculated.
A *paste* action indicates its associted nodes should be value-pasted
so that the values of the nodes persist after their
precedents are cleared.
A *clear* action indicates its associated nodes should be cleared
to save memory.
:meth:`generate_actions` returns a list
of actions. Each action is also represented by a list,
whose first element is a string,
which is either ``'calc'``, ``'paste'``, or ``'clear'``.
The string indicates the type of action to perform.
The second element is a list of :class:`~modelx.core.node.ItemNode`,
to which the action indicated by the first element apply.
Below is an example of the action list.
.. code-block::
[
['calc', [Model1.Space1.Cells1(), Model1.Space1.Cells2(x=0)]],
['paste', [Model1.Space1.Cells2(x=0), Model1.Space1.Cells1()]],
['clear', []],
['calc', [Model1.Space1.Cells2(x=1), Model1.Space1.Cells2(x=2)]],
['paste', [Model1.Space1.Cells2(x=2)]],
['clear', [Model1.Space1.Cells2(x=1), Model1.Space1.Cells2(x=0)]],
['calc', [Model1.Space1.Cells3(x=2)]],
['paste', [Model1.Space1.Cells3(x=2)]],
['clear', [Model1.Space1.Cells1(), Model1.Space1.Cells2(x=2)]]
]
:meth:`execute_actions` executes actions passed as ``actions``.
Before calling :meth:`execute_actions`, the user should
set the entire data set instead of the small data set
used for generating the actions.
After the execusion, the target nodes are value-pasted, and
the values of precedent nodes of the target nodes are all cleared.
To clear the values call :meth:`~modelx.core.cells.Cells.clear_at`
for the targets
or call :meth:`Cells.clear_all<modelx.core.cells.Cells.clear_all>`
or :meth:`Space.clear_all<modelx.core.space.UserSpace.clear_all>`
or :meth:`Model.clear_all<modelx.core.model.Model.clear_all>`.
Args:
actions(:obj:`list`): The *actions* list
.. seealso::
* :meth:`generate_actions`
* `Running a heavy model while saving memory <https://modelx.io/blog/2022/03/26/running-model-while-saving-memory/>`_,
a blog post on https://modelx.io
"""
gc_status = gc.isenabled()
gc.disable()
try:
for step in actions:
action, nodes = step
if action == "calc":
for n in nodes:
node = n._impl
node[OBJ].get_value_from_key(node[KEY])
elif action == "paste":
node_value_pairs = []
for n in nodes:
node = n._impl
node_value_pairs.append(
[node, node[OBJ].get_value_from_key(node[KEY])]
)
for node, value in node_value_pairs:
node[OBJ].set_value_from_key(node[KEY], value)
elif action == "clear":
for n in nodes:
node = n._impl
node[OBJ].clear_value_at(node[KEY])
gc.collect()
else:
raise RuntimeError("must not happen")
finally:
if gc_status:
gc.enable()
class TraceManager:
__slots__ = ()
__mixin_slots = (
"tracegraph",
"refgraph"
)
def __init__(self):
self.tracegraph = TraceGraph()
self.refgraph = ReferenceGraph()
def clear_with_descs(self, node):
"""Clear values and nodes calculated from `source`."""
removed = self.tracegraph.remove_with_descs(node)
self.refgraph.remove_nodes_from(removed)
for node in removed:
node[OBJ].on_clear_trace(node[KEY])
def clear_obj(self, obj):
"""Clear values and nodes of `obj` and their dependants."""
removed = self.tracegraph.clear_obj(obj)
self.refgraph.remove_nodes_from(removed)
for node in removed:
node[OBJ].on_clear_trace(node[KEY])
def clear_attr_referrers(self, ref):
removed = self.refgraph.remove_with_descs(ref)
for node in removed:
descs = self.tracegraph.remove_with_descs(node)
for desc in descs:
desc[OBJ].on_clear_trace(desc[KEY])
def get_calcsteps(self, targets, nodes, step_size):
""" Get calculation steps
Calculate a new block
Find nodes to paste in the block
Find nodes to clear from the earlier blocks
Push the paste node in the earlier blocks
"""
subgraph = self.tracegraph.subgraph(nodes)
ordered = list(nx.topological_sort(subgraph))
node_len = len(ordered)
pasted = [] # in reverse order
step = 0
result = []
while step * step_size < node_len:
start = step * step_size
stop = min(node_len, (step + 1) * step_size)
cur_block = ordered[start:stop]
cur_paste = []
cur_clear = []
cur_targets = [] # also included in cur_paste
for n in cur_block:
paste = False
if n in targets:
cur_targets.append(n)
paste = True
else:
for suc in subgraph.successors(n):
if suc not in cur_block:
paste = True
break
if paste:
cur_paste.append(n)
else:
cur_clear.append(n)
accum_nodes = set(ordered[:stop])
for n in pasted.copy():
paste = False
for suc in subgraph.successors(n):
if suc not in accum_nodes:
paste = True
break
if not paste:
cur_clear.append(n)
pasted.remove(n)
for n in cur_paste:
if n not in cur_targets:
pasted.append(n)
result.append(['calc', [ItemNode(n) for n in cur_block]])
result.append(['paste', [ItemNode(n) for n in reversed(cur_paste)]])
result.append(['clear', [ItemNode(n) for n in cur_clear]])
step += 1
assert not pasted
return result
_model_impl_base = (
TraceManager,
EditableParentImpl,
Impl
)
@add_stateattrs
class ModelImpl(*_model_impl_base):
interface_cls = Model
__slots__ = (
"_namespace",
"_global_refs",
"_dynamic_bases",
"_dynamic_bases_inverse",
"_dynamic_base_namer",
"currentspace",
"refmgr"
) + get_mixin_slots(*_model_impl_base)
def __init__(self, *, system, name):
if not name:
name = system._modelnamer.get_next(system.models)
elif not is_valid_name(name):
raise ValueError("Invalid name '%s'." % name)
Impl.__init__(self, system=system, parent=None, name=name)
EditableParentImpl.__init__(self)
TraceManager.__init__(self)
self.spmgr = SpaceManager(self)
self.currentspace = None
self._global_refs = RefDict("global_refs", self)
self._global_refs.set_item("__builtins__", builtins)
self._named_spaces = SpaceDict("named_spaces", self)
self._dynamic_bases = SpaceDict("dynamic_bases", self)
self._all_spaces = ImplChainMap("all_spaces",
self, SpaceView, [self._named_spaces, self._dynamic_bases]
)
self._dynamic_bases_inverse = {}
self._dynamic_base_namer = AutoNamer("__Space")
self._namespace = ImplChainMap("namespace",
self, BaseView, [self._named_spaces, self._global_refs]
)
self.allow_none = False
self.lazy_evals = self._namespace
self.refmgr = ReferenceManager(self, system.iomanager)
def rename(self, name):
"""Rename self. Must be called only by its system."""
if is_valid_name(name):
if name not in self.system.models:
self.name = name
return True # Rename success
else: # Model name already exists
return False
else:
raise ValueError("Invalid name '%s'." % name)
def repr_self(self, add_params=True):
return self.name
def repr_parent(self):
return ""
@Impl.doc.setter
def doc(self, value):
self._doc = value
@property
def global_refs(self):
return self._global_refs.fresh
refs = global_refs
own_refs = global_refs
@property
def namespace(self):
return self._namespace.fresh
def close(self):
self.system.close_model(self)
def get_impl_from_name(self, name):
"""Retrieve an object by a dotted name relative to the model."""
parts = name.split(".")
space = self.spaces[parts.pop(0)]
if parts:
return space.get_impl_from_namelist(parts)
else:
return space
# ----------------------------------------------------------------------
# Serialization by pickle
def __getstate__(self):
state = {key: getattr(self, key) for key in self.stateattrs}
graphs = {
name: graph
for name, graph in state.items()
if isinstance(graph, TraceGraph)
}
for gname, graph in graphs.items():
mapping = {}
for node in graph:
name = node[OBJ].idstr
if node_has_key(node):
mapping[node] = (name, node[KEY])
else:
mapping[node] = name
state[gname] = nx.relabel_nodes(graph, mapping)
state["ios"] = list(spec.io for spec in self.refmgr.specs)
return state
def __setstate__(self, state):
ios = state.pop("ios")
for attr in state:
setattr(self, attr, state[attr])
for io_ in ios:
self.system.iomanager.restore_io(self.interface, io_)
def restore_state(self, datapath=None):
"""Called after unpickling to restore some attributes manually."""
BaseParentImpl.restore_state(self)
mapping = {}
for node in self.tracegraph:
if isinstance(node, tuple):
name, key = node
else:
name, key = node, None
cells = self.get_impl_from_name(name)
mapping[node] = get_node(cells, key, None)
self.tracegraph = nx.relabel_nodes(self.tracegraph, mapping)
self._global_refs.restore_state()
def _check_sanity(self):
for name, r in self.global_refs.items():
if name != "__builtins__":
assert id(r.interface) in self.refmgr._valid_to_refs
self.refmgr._check_sanity()
self.spmgr._check_sanity()
@property
def updater(self):
return SpaceUpdater(self.spmgr)
def del_ref(self, name):
ref = self.global_refs[name]
self.model.clear_attr_referrers(ref)
ref.on_delete()
self.global_refs.delete_item(name)
def change_ref(self, name, value):
self.del_ref(name)
self.new_ref(name, value)
def new_ref(self, name, value):
ref = ReferenceImpl(
self, name, value, container=self._global_refs,
set_item=False)
self._global_refs.add_item(name, ref)
return ref
def get_attr(self, name):
if name in self.spaces:
return self.spaces[name].interface
elif name in self.global_refs:
return self.global_refs[name].interface
else:
raise AttributeError(
"Model '{0}' does not have '{1}'".format(self.name, name)
)
def set_attr(self, name, value, refmode=None):
if name in self.spaces:
raise KeyError("Space named '%s' already exist" % self.name)
elif name in self.global_refs:
self.refmgr.change_ref(self, name, value)
else:
self.refmgr.new_ref(self, name, value, refmode)
def del_attr(self, name):
if name in self.named_spaces:
self.updater.del_defined_space(self.named_spaces[name])
elif name in self.global_refs:
self.refmgr.del_ref(self, name)
else:
raise KeyError("Name '%s' not defined" % name)
# ----------------------------------------------------------------------
# Dynamic base manager
def get_dynamic_base(self, bases: tuple):
"""Create of get a base space for a tuple of bases"""
try:
return self._dynamic_bases_inverse[bases]
except KeyError:
name = self._dynamic_base_namer.get_next(self._dynamic_bases)
base = self.updater.new_space(
self,
name=name,
bases=bases,
prefix="__",
container=self._dynamic_bases)
self._dynamic_bases_inverse[bases] = base
return base
def split_node(node):
parent = ".".join(node.split(".")[:-1])
name = node.split(".")[-1]
return parent, name
def len_node(node):
return len(node.split("."))
def trim_left(node, trimed_len):
return ".".join(node.split(".")[trimed_len:])
def trim_right(node, trimed_len):
if trimed_len == 0:
return node
else:
return ".".join(node.split(".")[:-trimed_len])
def _get_shared_part(a_node, b_node, from_left=True):
a_node = a_node.split(".")
b_node = b_node.split(".")
length = min(len(a_node), len(b_node))
while length:
if from_left:
a_node, b_node = a_node[:length], b_node[:length]
else:
a_node, b_node = a_node[-length:], b_node[-length:]
if a_node == b_node:
return ".".join(a_node)
length -= 1
def get_shared_asc(a_node, b_node):
return _get_shared_part(a_node, b_node, from_left=True)
def get_shared_desc(a_node, b_node):
return _get_shared_part(a_node, b_node, from_left=False)
def has_parent(node, parent):
parent_len = len_node(parent)
if len_node(node) <= parent_len:
return False
elif trim_right(node, len_node(node) - parent_len) == parent:
return True
else:
return False
class SpaceGraph(nx.DiGraph):
"""New implementation of inheritance graph
Node state:
copied: Copied into sub graph
defined: Node created but space yet to create
created: Space created
updated: Existing space updated -- Not Used
unchanged: Existing space confirmed unchanged -- Not Used
"""
def fresh_copy(self): # Only for networkx -2.1
"""Overriding Graph.fresh_copy"""
return SpaceGraph()
def ordered_preds(self, node):
edges = [(self.edges[e]["index"], e) for e in self.in_edges(node)]
return [e[0] for i, e in sorted(edges, key=lambda elm: elm[0])]
def ordered_subs(self, node):
g = nx.descendants(self, node)
g.add(node)
return nx.topological_sort(self.subgraph(g))
def get_derived_subs(self, node):
"""Get node and all subs that can be reached only by derived edges"""
que = [node]
accum = [node]
while que:
n = que.pop(0)
for e in self.out_edges(n):
if self.edges[e]["mode"] == "derived":
t, h = e
que.append(h)
accum.append(h)
return accum
def max_index(self, node):
return max(
[self.edges[e]["index"] for e in self.in_edges(node)],
default=0
)
def get_mro(self, node):
"""Calculate the Method Resolution Order of bases using the C3 algorithm.
Code modified from
http://code.activestate.com/recipes/577748-calculate-the-mro-of-a-class/
Args:
bases: sequence of direct base spaces.
Returns:
mro as a list of bases including node itself
"""
seqs = [self.get_mro(base)
for base in self.ordered_preds(node)
] + [self.ordered_preds(node)]
res = []
while True:
non_empty = list(filter(None, seqs))
if not non_empty:
# Nothing left to process, we're done.
res.insert(0, node)
return res
for seq in non_empty: # Find merge candidates among seq heads.
candidate = seq[0]
not_head = [s for s in non_empty if candidate in s[1:]]
if not_head:
# Reject the candidate.
candidate = None
else:
break
if not candidate: # Better to return None instead of error?
raise TypeError(
"inconsistent hierarchy, no C3 MRO is possible"
)
res.append(candidate)
for seq in non_empty:
# Remove candidate.
if seq[0] == candidate:
del seq[0]
def get_derived_graph(self, on_edge=None, on_remove=None, start=()):
g = self.copy_as_spacegraph(self)
for e in self._visit_edges(*start):
g._derive_tree(e, on_edge, on_remove)
return g
def get_absbases(self):
"""Get edges from absolute base nodes"""
result = list(self.edges)
for e in self.edges:
tail, head = e
if self.get_endpoints(
self.visit_treenodes(
self._get_topnode(tail)), edge="in"):
result.remove(e)
return result
def _visit_edges(self, *start):
"""Generator yielding edges in breadth-first order"""
if not start:
start = self.get_absbases()
que = list(start)
visited = set()
while que:
e = que.pop(0)
if e not in visited:
yield e
visited.add(e)
_, head = e
edges = []
for n in self.visit_treenodes(self._get_topnode(head, edge="out")):
if self._is_endpoint(n, edge="out"):
edges.extend(oe for oe in self.out_edges(n)
if oe not in visited)
que += edges
def check_cyclic(self, start, node):
"""True if no cyclic"""
succs = self._get_otherends(
self.visit_treenodes(self._get_topnode(node, edge="out")),
edge="out")
for n in succs:
if self._is_linealrel(start, n):
return False
else:
if not self.check_cyclic(start, n):
return False
return True
def _derive_tree(self, edge, on_edge=None, on_remove=None):
"""Create derived node under the head of edge from the tail of edge"""
tail, head = edge
tlen, hlen = len_node(tail), len_node(head)
if tail:
bases = list(trim_left(n, tlen)
for n in self.visit_treenodes(tail, include_self=False))
else:
bases = []
subs = list(trim_left(n, hlen)
for n in self.visit_treenodes(head, include_self=False))
# missing = bases - subs
derived = list((tail + "." + n, head + "." + n) for n in bases)
derived.insert(0, (tail, head))
for e in derived:
if e not in self.edges:
t, h = e
if h not in self.nodes:
self.add_node(h, mode="derived", state="defined")
if t: # t can be ""
level = len_node(t) - tlen
self.add_edge(
t, h,
mode="derived",
level=level,
index=self.max_index(t) + 1
)
if on_edge:
on_edge(self, e)
for n in reversed(subs):
if n not in bases:
n = head + "." + n
if self.nodes[n]["mode"] == "derived":
if not list(self.predecessors(n)):
if on_remove:
on_remove(self, n)
self.remove_node(n)
def subgraph_from_nodes(self, nodes):
"""Get sub graph with nodes reachable form ``node``"""
result = set()
for node in nodes:
if node in self.nodes:
nodeset, _ = self._get_nodeset(node, set())
result.update(nodeset)
subg = self.copy_as_spacegraph(self.subgraph(result))
for n in subg.nodes:
subg.nodes[n]["state"] = "copied"
return subg
def subgraph_from_state(self, state):
"""Get sub graph with nodes with ``state``"""
nodes = set(n for n in self if self.nodes[n]["state"] == state)
return self.copy_as_spacegraph(self.subgraph(nodes))
def get_updated(self, subgraph, nodeset=None, keep_self=True,
on_restore=None):
"""Return a new space graph with nodeset removed and subgraph added
subgraph's state attribute is removed.
"""
if nodeset is None:
nodeset = subgraph.nodes
if keep_self:
src = self.copy_as_spacegraph(self)
else:
src = self
for n in subgraph.nodes:
del subgraph.nodes[n]["state"]
src.remove_nodes_from(nodeset)
if on_restore:
for n in self.nodes:
on_restore(subgraph, n)
return nx.compose(src, subgraph)
def _get_nodeset(self, node, processed):
"""Get a subset of self.
Get a subset of self such that the subset contains
nodes connected to ``node`` either through inheritance or composition.
0. Prepare an emptly node set
1. Get the top endopoint in the tree that ``node`` is in, or ``node``
if none.
2. Add to the node set all the child nodes of the top endpoint.
3. Find node sets.
4. For each endpoint in the child nodes, repeat from 1.
"""
top = self._get_topnode(node)
tree = set(self.visit_treenodes(top))
ends = self.get_endpoints(tree)
neighbors = self._get_otherends(ends) - processed
processed.update(ends)
result = tree.copy()
for n in neighbors:
ret_res, _ = self._get_nodeset(n, processed)
result.update(ret_res)
return result, processed
def get_parent_nodes(self, node: str, include_self=True):
"""Get ancestors of ``node`` in order"""
maxlen = len_node(node) if include_self else len_node(node) - 1
result = []
for i in range(maxlen, 0, -1):
n = trim_right(node, len_node(node)-i)
if n in self.nodes:
result.insert(0, n)
else:
break
return result
def _get_topnode(self, node, edge="any"):
"""Get the highest node that is an ancestor of the ``node``.
If none exits, return ``node``.
"""
parents = self.get_parent_nodes(node)
return next((n for n in parents if self._is_endpoint(n, edge)), node)
def _visit_treenodes_levels(self, node, include_self=True):
que = [node]
level = 0
while que:
n = que.pop(0)
if n != node or include_self:
yield level, n
childs = [ch for ch in self.nodes
if ch[:len(n) + 1] == (n + ".")
and len_node(n) + 1 == len_node(ch)]
que += childs
level += 1
def visit_treenodes(self, node, include_self=True):
for _, n in self._visit_treenodes_levels(
node,include_self=include_self):
yield n
def get_endpoints(self, nodes, edge="any"):
return set(n for n in nodes if self._is_endpoint(n, edge))
def _get_otherends(self, nodes, edge="any"):
otherends = [set(self._get_neighbors(n, edge)) for n in nodes]
return set().union(*otherends)
def _get_neighbors(self, node, edge):
if edge == "in":
return self.predecessors(node)
elif edge == "out":
return self.successors(node)
else:
return itertools.chain(
self.predecessors(node), self.successors(node))
def _is_endpoint(self, node, edge="any"):
if edge == "out":
return bool(self.out_edges(node))
elif edge == "in":
return bool(self.in_edges(node))
elif edge == "any":
return bool(self.out_edges(node) or
self.in_edges(node))
else:
raise ValueError
def _has_child(self, node, child):
node_len = len_node(node)
if node_len >= len_node(child):
return False
elif node == trim_right(child, len_node(child) - node_len):
return True
else:
return False
def _is_linealrel(self, node, other):
return (
node == other
or self._has_child(node, other)
or has_parent(node, other)
)
def to_space(self, node):
return self.nodes[node]["space"]
def get_mode(self, node):
return self.nodes[node]["mode"]
def copy_as_spacegraph(self, g):
"""Copy g as SpaceGraph.
This method is only for compatibility with networkx 2.1 or older.
Overriding fresh_copy method is also needed.
G can be a sub graph view.
"""
if _nxver < (2, 2):
# modified from https://github.com/networkx/networkx/blob/networkx-2.1/networkx/classes/digraph.py#L1080-L1167
# See LICENSES/NETWORKX_LICENSE.txt
def copy(klass, graph, as_view=False):
if as_view is True:
return nx.graphviews.DiGraphView(graph)
G = klass()
G.graph.update(graph.graph)
G.add_nodes_from((n, d.copy()) for n, d in graph._node.items())
G.add_edges_from((u, v, datadict.copy())
for u, nbrs in graph._adj.items()
for v, datadict in nbrs.items())
return G
return copy(type(self), g)
else:
return type(self).copy(g)
def get_relative(self, subspace, basespace, basevalue):
shared_parent = get_shared_asc(basespace, basevalue)
if not shared_parent:
return None
shared_desc = get_shared_desc(subspace, basespace)
if shared_desc:
shared_desc = shared_desc.split(".")
else:
shared_desc = []
subroot = trim_right(subspace, len(shared_desc))
basroot = trim_right(basespace, len(shared_desc))
while True:
if basroot in self.get_mro(subroot):
break
if shared_desc:
n = shared_desc.pop(0)
subroot = ".".join(subroot.split(".") + [n])
basroot = ".".join(basroot.split(".") + [n])
else:
raise RuntimeError("must not happen")
if basroot == shared_parent or has_parent(shared_parent, basroot):
relative_part = trim_left(basevalue, len_node(basroot))
if relative_part:
return subroot + "." + relative_part
else:
return subroot
else:
return None
class Instruction:
def __init__(self, func, args=(), arghook=None, kwargs=None):
self.func = func
self.args = args
self.arghook = arghook
self.kwargs = kwargs if kwargs else {}
def execute(self):
if self.arghook:
args, kwargs = self.arghook(self)
else:
args, kwargs = self.args, self.kwargs
return self.func(*args, **kwargs)
@property
def funcname(self):
return self.func.__name__
def __repr__(self):
return "<Instruction: %s>" % self.funcname
class InstructionList(list):
def execute(self, clear=True):
result = None
for inst in self:
result = inst.execute()
if clear:
self.clear()
return result
class SharedSpaceOperations:
def __init__(self, model):
self.model = model
self._inheritance = SpaceGraph()
self._graph = SpaceGraph()
def _can_add(self, parent, name, klass, overwrite=True):
"""Check name conflict for a given name.
:obj:`False` if ``name`` is already defined not
as an instance of ``klass``
in ``parent`` or in any of ``parent`` descendants.
:obj:`False` if ``name`` is already defined
as an instance of ``klass`` and ``overwirte`` is :obj:`True`,
otherwise :obj:`True`.
"""
# TODO: Reflect the overwriting order of names
if parent is self.model:
return name not in parent.namespace
sub = self._find_name_in_subs(parent, name) # start from parent
if sub is None:
return True
elif isinstance(sub, klass) and overwrite:
return True
else:
return False
def _find_name_in_subs(self, parent, name):
for subspace in self._get_subs(parent, skip_self=False):
if name in subspace.namespace:
return subspace._namespace.fresh[name]
return None
def _set_defined(self, node):
for graph in (self._inheritance, self._graph):
for parent in graph.get_parent_nodes(node):
graph.nodes[parent]["mode"] = "defined"
def _get_space_bases(self, space, skip_self=True):
idx = 1 if skip_self else 0
nodes = self._graph.get_mro(space.idstr)[idx:]
return [self._graph.to_space(n) for n in nodes]
def get_deriv_bases(self, deriv: Derivable, defined_only=False,
graph: SpaceGraph=None):
if graph is None:
graph = self._graph
if isinstance(deriv, UserSpaceImpl): # Not Dynamic spaces
return self._get_space_bases(deriv, graph)
pnode = deriv.parent.idstr
bases = []
for bspace in graph.get_mro(pnode)[1:]:
base_members = deriv._get_members(graph.to_space(bspace))
if deriv.name in base_members:
b = base_members[deriv.name]
if not defined_only or b.is_defined():
bases.append(b)
return bases
def get_direct_bases(self, space):
node = space.idstr
preds = self._inheritance.ordered_preds(node)
return [self._inheritance.to_space(n) for n in preds]
def update_subs(self, space, skip_self=True):
for attr in ("cells", "own_refs"):
for s in self._get_subs(space, skip_self):
b = self._get_space_bases(s, self._graph)
s.on_inherit(self, b, attr)
def _get_subs(self, space, skip_self=True):
idx = 1 if skip_self else 0
return [
self._graph.to_space(desc) for desc in list(
self._graph.ordered_subs(space.idstr))[idx:]
]
def get_relative_interface(self, parent, base):
basespace = base.parent.idstr
basevalue = base.interface._impl.idstr
subimpl = self._graph.get_relative(
parent.idstr, basespace, basevalue)
if subimpl:
return True, self.model.get_impl_from_name(subimpl).interface
else:
return False, base.interface
class SpaceManager(SharedSpaceOperations):
def rename_space(self, space, name):
# Check name does not exit already
parent = space.parent
if not self._can_add(
parent, name, UserSpaceImpl, overwrite=False):
raise ValueError("Cannot rename '%s' to '%s'" % (space.name, name))
# Check space is not derived or overwritten
for e in self._graph.in_edges(space.idstr):
if self._graph.edges[e]["mode"] == "derived":
t, h = e
raise ValueError(
"'%s' has derived base '%s'" % (h, t))
# Derived/Overwritten spaces are renamed
subspaces = list(
self._graph.to_space(n) for n in self._graph.get_derived_subs(
space.idstr)
)
# Create name mapping
mapping = {}
for s in subspaces:
old_id = tuple(s.idstr.split("."))
new_id = old_id[:-1] + (name,)
for node in self._graph.visit_treenodes(
s.idstr, include_self=True):
old_child = tuple(node.split("."))
assert old_id == old_child[:len(old_id)]
mapping[node] = ".".join(new_id + old_child[len(new_id):])
for s in subspaces:
if not s.parent.is_model():
# Clear parent's dynsub, not s's
s.parent.clear_subs_rootitems()
# Call on_rename callbacks
s.on_rename(name)
# Rename nodes
nx.relabel_nodes(self._inheritance, mapping, copy=False)
nx.relabel_nodes(self._graph, mapping, copy=False)
def del_cells(self, space, name):
cells = space.cells[name]
if cells.is_derived():
raise ValueError("cannot delete derived")
space.on_del_cells(name)
self.update_subs(space, skip_self=False)
def del_ref(self, space, name):
space.on_del_ref(name)
self.update_subs(space, skip_self=False)
def new_cells(self, space, name=None, formula=None, data=None,
is_derived=False, source=None, overwrite=True):
# FIX: Creating a Cells of the same name in ``space``
if not self._can_add(space, name, CellsImpl, overwrite=overwrite):
raise ValueError("Cannot create cells '%s'" % name)
self._set_defined(space.idstr)
space.set_defined()
cells = UserCellsImpl(
space=space, name=name, formula=formula,
data=data,
source=source, is_derived=is_derived)
space.clear_subs_rootitems()
name = cells.name # If name is none, auto-named in __init__
for subspace in self._get_subs(space):
if name in subspace.cells:
break
else:
subspace.clear_subs_rootitems()
derived = UserCellsImpl(
space=subspace,
base=cells, is_derived=True, add_to_space=False
)
base_cells = {}
for b in reversed(subspace.bases):
base_cells.update(b.cells)
idx = list(base_cells).index(name)
cells_after = list(subspace.cells)[idx:]
subspace._cells.set_item(name, derived)
for k in cells_after:
subspace._cells[k] = subspace._cells.pop(k)
return cells
def copy_cells(self, space: UserSpaceImpl,
source: UserCellsImpl, name=None):
"""``space`` can be of another Model"""
if space.model is not self.model:
return space.spmgr.copy_cells(space, source, name)
if name is None:
name = source.name
data = {k: v for k, v in source.data.items() if k in source.input_keys}
return self.new_cells(space, name=name, formula=source.formula,
data=data, is_derived=False, overwrite=False)
def rename_cells(self, cells, name):
"""Renames the Cells name"""
if not is_valid_name(name):
raise ValueError("name '%s' is invalid" % name)
if not self._can_add(cells.parent, name, CellsImpl, overwrite=True):
raise ValueError("cannot create cells '%s'" % name)
if cells.bases:
raise ValueError("'%s' is a sub Cells of '%s'" % (
cells.get_repr(fullname=True, add_params=False),
cells.bases[0].get_repr(fullname=True, add_params=False)))
old_name = cells.name
for space in self._get_subs(cells.parent, skip_self=False):
space.clear_subs_rootitems()
space.cells[old_name].on_rename(name)
def sort_cells(self, space):
"""Sort cells in a space
- Applies only to defined UserSpaces
- Only cells defined in the space (neither derived/overridden)
are sorted and placed before the derived/overridden cells.
- Derived/overridden cells in the sub spaces are also sorted.
"""
for subspace in self._get_subs(space, skip_self=False):
subspace.on_sort_cells(space=space)
def change_cells_formula(self, cells, func):
define = True
for space in self._get_subs(cells.parent, skip_self=False):
c = space.cells[cells.name]
if c is not cells and c.is_defined():
break # Stop when sub cells is defined
space.clear_subs_rootitems()
space.cells[cells.name].on_change_formula(func, define)
define = False # Do not define derived cells
def del_cells_formula(self, cells):
self.change_cells_formula(cells, NULL_FORMULA)
def _check_subs_relrefs(self, space, name, value, refmode):
# Check if relative ref is possible when refmode is 'relative'
if isinstance(value, Interface) and refmode == "relative":
basevalue = value._impl.idstr
for subspace in self._get_subs(space):
if name in subspace.own_refs:
break
else:
subvalue = self._graph.get_relative(
subspace.idstr, space.idstr,
basevalue)
if not subvalue:
raise ValueError(
"Cannot create relative reference for '%s' in '%s'"
% (basevalue, subspace.idstr)
)
def new_ref(self, space, name, value, refmode):
other = self._find_name_in_subs(space, name)
if other is not None:
if not isinstance(other, ReferenceImpl):
raise ValueError("Cannot create reference '%s'" % name)
elif other not in self.model.global_refs.values():
raise ValueError("Cannot create reference '%s'" % name)
self._check_subs_relrefs(space, name, value, refmode)
self._set_defined(space.idstr)
space.set_defined()
result = space.on_create_ref(name, value, is_derived=False,
refmode=refmode)
for subspace in self._get_subs(space):
is_relative = False
if name in subspace.own_refs:
break
if isinstance(value, Interface) and value._is_valid():
if refmode == "auto" or refmode == "relative":
is_relative, value = self.get_relative_interface(
subspace, space.own_refs[name])
ref = subspace.on_create_ref(name, value, is_derived=True,
refmode=refmode)
ref.is_relative = is_relative
return result
def change_ref(self, space, name, value, refmode):
"""Assigns a new value to an existing name."""
self._check_subs_relrefs(space, name, value, refmode)
self._set_defined(space.idstr)
space.set_defined()
is_relative = False if refmode == "absolute" else True
space.on_change_ref(name, value, is_derived=False, refmode=refmode,
is_relative=is_relative)
for subspace in self._get_subs(space):
is_relative = False
subref = subspace.own_refs[name]
if subref.is_defined():
break
elif subref.defined_bases[0] is not space.own_refs[name]:
break
if isinstance(value, Interface) and value._is_valid():
if (refmode == "auto"
or refmode == "relative"):
is_relative, value = self.get_relative_interface(
subspace, space.own_refs[name])
ref = subspace.on_change_ref(name, value,
is_derived=True, refmode=refmode,
is_relative=is_relative)
ref.is_relative = is_relative
def _check_sanity(self):
# both graph must have the same nodes
assert self._inheritance.nodes == self._graph.nodes
nodes = set(self._graph.nodes)
spaces = dict(self.model._all_spaces)
# consistency between spaces and nodes
while spaces:
k, v = spaces.popitem()
assert k == v.name
assert v.idstr in nodes
assert v is self._graph.nodes[v.idstr]["space"]
nodes.remove(v.idstr)
spaces.update(v.named_spaces)
assert not nodes # Check all nodes are reached
class SpaceUpdater(SharedSpaceOperations):
def __init__(self, manager):
self.manager = manager
super().__init__(manager.model)
self.oldsubg_inherit = None
self.oldsubg = None
self._instructions = InstructionList()
def _init_subgraphs(self, spaces, copy_derived=False):
nodes = [s.idstr for s in spaces]
self.oldsubg_inherit = self.manager._inheritance.subgraph_from_nodes(
nodes)
self.oldsubg = self.oldsubg_inherit.get_derived_graph()
self._inheritance = self.oldsubg_inherit.copy_as_spacegraph(
self.oldsubg_inherit)
if copy_derived:
self._graph = self.oldsubg.copy_as_spacegraph(self.oldsubg)
def _update_manager(self):
self._inheritance.remove_nodes_from(
set(n for n in self._inheritance if n not in self._graph))
# Add derived spaces back to self._inheritance
created = self._graph.subgraph_from_state("created")
if created:
created.remove_edges_from(list(created.edges))
self._inheritance = nx.compose(self._inheritance, created)
self.manager._inheritance = self.manager._inheritance.get_updated(
self._inheritance,
nodeset=self.oldsubg_inherit,
keep_self=False
)
self.manager._graph = self.manager._graph.get_updated(
self._graph,
nodeset=self.oldsubg,
keep_self=False
)
def _new_derived_space(self, node):
parent_node, name = split_node(node)
if parent_node:
parent = self._graph.to_space(parent_node)
else:
parent =self.model
space = UserSpaceImpl(
parent,
name,
container=parent._named_spaces,
is_derived=True
# formula=formula,
# refs=refs,
# source=source,
# doc=doc
)
self._graph.nodes[node]["space"] = space
self._graph.nodes[node]["state"] = "created"
def _update_derived_space(self, node):
space = self._graph.to_space(node)
bases = self._get_space_bases(space, self._graph)
space.on_inherit(self, bases, 'cells')
self._instructions.append(
Instruction(self._update_derived_refs, (node,))
)
def _update_derived_refs(self, node):
space = self._graph.to_space(node)
bases = self._get_space_bases(space, self._graph)
space.on_inherit(self, bases, 'own_refs')
def _derive_hook(self, graph, edge):
"""Callback passed as on_edge parameter"""
_, head = edge
mode = graph.nodes[head]["mode"]
state = graph.nodes[head]["state"]
if mode == "derived" and state == "defined":
self._instructions.append(
Instruction(self._new_derived_space, (head,))
)
self._instructions.append(
Instruction(self._update_derived_space, (head,))
)
def _remove_hook(self, graph, node):
parent_node, name = split_node(node)
if parent_node in self.manager._graph:
parent = self.manager._graph.to_space(parent_node)
elif parent_node:
parent = graph.to_space(parent_node)
else:
parent = self.model
method = parent.on_del_space
self._instructions.append(
Instruction(method, (name,))
)
def new_space(
self,
parent,
name=None,
bases=None,
formula=None,
refs=None,
source=None,
is_derived=False,
prefix="",
doc=None,
container=None
):
"""Create a new child space.
Args:
name (str): Name of the space. If omitted, the space is
created automatically.
bases: If specified, the new space becomes a derived space of
the `base` space.
formula: Function whose parameters used to set space parameters.
refs: a mapping of refs to be added.
source: A source module from which cell definitions are read.
prefix: Prefix to the autogenerated name when name is None.
"""
if name is None:
while True:
name = parent.spacenamer.get_next(parent.namespace, prefix)
if self.manager._can_add(parent, name, UserSpaceImpl):
break
elif not self.manager._can_add(parent, name, UserSpaceImpl):
raise ValueError("Cannot create space '%s'" % name)
if not prefix and not is_valid_name(name):
raise ValueError("Invalid name '%s'." % name)
if bases is None:
bases = []
elif isinstance(bases, UserSpaceImpl):
bases = [bases]
node = name if parent.is_model() else parent.idstr + "." + name
spaces = [s for s in bases]
if not parent.is_model():
spaces.insert(0, parent)
self._init_subgraphs(spaces, copy_derived=True)
for g in (self._inheritance, self._graph):
g.add_node(
node, mode="defined", state="defined")
for b in bases:
base = b.idstr
g.add_edge(
base, node,
mode="defined",
level=0,
index=g.max_index(node) + 1
)
for pnode in g.get_parent_nodes(node):
g.nodes[pnode]["mode"] = "defined"
if not nx.is_directed_acyclic_graph(self._inheritance):
raise ValueError("cyclic inheritance")
if not self._inheritance.check_cyclic(node, node):
raise ValueError("cyclic inheritance through composition")
self._inheritance.get_mro(node) # Check if MRO is possible
start = [
(tail, node) for tail in self._inheritance.ordered_preds(node)]
self._graph = self._graph.get_derived_graph(
on_edge=self._derive_hook, start=start)
if not nx.is_directed_acyclic_graph(self._graph):
raise ValueError("cyclic inheritance")
# Check if MRO is possible for each node in sub graph
for n in nx.descendants(self._graph, node):
self._graph.get_mro(n)
if not parent.is_model():
parent.set_defined()
if container is None:
container = parent._named_spaces
space = UserSpaceImpl(
parent,
name,
container,
is_derived,
formula=formula,
refs=refs,
source=source,
doc=doc
)
self._graph.nodes[node]["space"] = space
self._graph.nodes[node]["state"] = "created"
try:
self._instructions.execute()
except BaseException:
container.del_item(name)
raise
self._update_manager()
return space
def add_bases(self, space, bases):
"""Add bases to space in graph
"""
node = space.idstr
basenodes = [base.idstr for base in bases]
for base in [node] + basenodes:
if base not in self.manager._inheritance:
raise ValueError("Space '%s' not found" % base)
self._init_subgraphs([space] + bases)
for b in basenodes:
self._inheritance.add_edge(
b,
node,
mode="defined",
level=0,
index=self._inheritance.max_index(node) + 1
)
for p in self._inheritance.get_parent_nodes(node):
self._inheritance.nodes[p]["mode"] = "defined"
if not nx.is_directed_acyclic_graph(self._inheritance):
raise ValueError("cyclic inheritance")
for n in itertools.chain({node}, nx.descendants(
self._inheritance, node)):
self._inheritance.get_mro(n)
self._graph = self._inheritance.get_derived_graph(
on_edge=self._derive_hook)
if not nx.is_directed_acyclic_graph(self._graph):
raise ValueError("cyclic inheritance")
for desc in itertools.chain(
{node},
nx.descendants(self._graph, node)):
mro = self._graph.get_mro(desc)
# Check name conflict between spaces, cells, refs
members = {}
for attr in ["spaces", "cells", "refs"]:
namechain = []
for sname in mro:
space = self._graph.to_space(sname)
namechain.append(set(getattr(space, attr).keys()))
members[attr] = set().union(*namechain)
conflict = set().intersection(*[n for n in members.values()])
if conflict:
raise NameError("name conflict: %s" % conflict)
self._instructions.execute()
self._update_manager()
def remove_bases(self, space, bases):
node = space.idstr
basenodes = [base.idstr for base in bases]
for base in [node] + basenodes:
if base not in self.manager._inheritance:
raise ValueError("Space '%s' not found" % base)
self._init_subgraphs([space] + bases)
for b in basenodes:
self._inheritance.remove_edge(b, node)
if not nx.is_directed_acyclic_graph(self._inheritance):
raise ValueError("cyclic inheritance")
for n in itertools.chain({node}, nx.descendants(
self._inheritance, node)):
self._inheritance.get_mro(n)
start = self._inheritance.get_absbases()
start.insert(0, ("", node))
self._graph = self._inheritance.get_derived_graph(
on_edge=self._derive_hook,
on_remove=self._remove_hook,
start=start
)
if not nx.is_directed_acyclic_graph(self._graph):
raise ValueError("cyclic inheritance")
for desc in itertools.chain(
{node},
nx.descendants(self._graph, node)):
mro = self._graph.get_mro(desc)
# Check name conflict between spaces, cells, refs
members = {}
for attr in ["spaces", "cells", "refs"]:
namechain = []
for sname in mro:
space = self._graph.to_space(sname)
namechain.append(set(getattr(space, attr).keys()))
members[attr] = set().union(*namechain)
conflict = set().intersection(*[n for n in members.values()])
if conflict:
raise NameError("name conflict: %s" % conflict)
self._instructions.execute()
self._update_manager()
def del_defined_space(self, space):
if space.is_derived():
raise ValueError(
"%s has derived spaces" % repr(space.interface)
)
node = space.idstr
if node not in self.manager._inheritance:
raise ValueError("Space '%s' not found" % node)
elif self.manager._inheritance.nodes[node]["mode"] == "derived":
raise ValueError("cannot delete derived space")
self._init_subgraphs([space])
succs = list(self._inheritance.successors(node))
# Remove node and its child tree
nodes_removed = list()
for child in self._inheritance.visit_treenodes(node):
nodes_removed.append(child)
self._remove_hook(self._inheritance, child)
self._inheritance.remove_nodes_from(nodes_removed)
self._graph = self._inheritance.get_derived_graph(
on_edge=self._derive_hook,
on_remove=self._remove_hook,
start=[("", node) for node in succs]
)
for n in set(self._inheritance.nodes):
if n not in self._graph:
self._inheritance.remove_node(n)
self._instructions.execute()
self._update_manager()
if space is self.model.currentspace:
self.model.currentspace = None
def copy_space(
self,
parent: EditableParentImpl,
source: UserSpaceImpl,
name=None,
defined_only=False
):
if parent.has_ascendant(source):
raise ValueError("Cannot copy to child")
if parent.model is not self.model:
return parent.model.updater.copy_space(
parent, source, name, defined_only)
if name is None:
name = source.name
if self.manager._can_add(
parent, name, EditableParentImpl, overwrite=False):
return self._copy_space_recursively(
parent, source, name, defined_only
)
else:
raise ValueError("Cannot create space '%s'" % name)
def _copy_space_recursively(
self, parent, source, name, defined_only):
if source.is_derived():
return
space = self.new_space(
parent,
name=name,
bases=None,
formula=source.formula,
refs={k: v.interface for k, v in source.own_refs.items()},
source=source.source,
is_derived=False,
prefix="",
doc=source.doc,
container=None
)
for cells in source.cells.values():
if cells.is_defined():
self.manager.copy_cells(space, cells)
for child in source.named_spaces.values():
self._copy_space_recursively(
space, child, child.name, defined_only)
return space
class ReferenceManager:
def __init__(self, model, iomanager):
self._model = model
self._manager = iomanager
self._valid_to_refs = {} # id(value) -> [refs]
def _check_sanity(self):
for refs in self._valid_to_refs.values():
for r in refs:
spec = self._manager.get_spec_from_value(
io_group=self._model.interface,
value=r.interface)
if spec is not None:
assert r.interface is spec.value
spec._check_sanity()
def has_spec(self, value):
spec = self._manager.get_spec_from_value(self._model.interface, value)
return spec is not None
def get_spec(self, value):
return self._manager.get_spec_from_value(self._model.interface, value)
@property
def values(self):
return list(ref[0].interface for ref in self._valid_to_refs.values())
@property
def specs(self):
result = []
for r in self._valid_to_refs.values():
spec = self.get_spec(r[0].interface)
if spec is not None:
result.append(spec)
return result
def new_ref(self, impl, name, value, refmode):
if isinstance(impl, ModelImpl):
ref = impl.new_ref(name, value)
elif isinstance(impl, UserSpaceImpl):
ref = impl.model.spmgr.new_ref(impl, name, value, refmode)
else:
raise RuntimeError("must not happen")
if not isinstance(value, Interface):
id_ = id(value)
if id_ in self._valid_to_refs:
refs = self._valid_to_refs[id_]
assert all(ref is not r for r in refs)
refs.append(ref)
else:
self._valid_to_refs[id_] = [ref]
def del_ref(self, impl, name):
refdict = impl.own_refs
ref = refdict[name]
valid = id(ref.interface)
val = ref.interface
if isinstance(impl, ModelImpl):
impl.del_ref(name)
elif isinstance(impl, UserSpaceImpl):
impl.model.spmgr.del_ref(impl, name)
else:
raise RuntimeError("must not happen")
refs = self._valid_to_refs.get(valid)
assert refs
refs.remove(ref)
if not refs:
del self._valid_to_refs[valid]
spec = self._manager.get_spec_from_value(
io_group=self._model.interface,
value=val
)
if spec:
self._manager.del_spec(spec)
def change_ref(self, impl, name, value, refmode=None):
refdict = impl.own_refs
prev_ref = refdict[name]
prev_valid = id(prev_ref.interface)
prev_val = prev_ref.interface
if isinstance(impl, ModelImpl):
impl.model.change_ref(name, value)
elif isinstance(impl, UserSpaceImpl):
impl.model.spmgr.change_ref(impl, name, value, refmode)
else:
raise RuntimeError("must not happen")
refs = self._valid_to_refs.get(prev_valid, None)
if refs is not None: # None in case prev_ref is derived
if prev_ref in refs:
refs.remove(prev_ref)
if not refs: # ref is empty
del self._valid_to_refs[prev_valid]
spec = self._manager.get_spec_from_value(self._model.interface, prev_val)
if spec:
self._manager.del_spec(spec)
if not isinstance(value, Interface):
valid = id(value)
if valid in self._valid_to_refs:
self._valid_to_refs[valid].append(refdict[name])
else:
self._valid_to_refs[id(value)] = [refdict[name]]
def del_all_spec(self):
specs = self.specs.copy()
while specs:
self._manager.del_spec(specs.pop())
def update_value(self, old_value, new_value=None, **kwargs):
prev_id = id(old_value)
refs = self._valid_to_refs.get(prev_id, None)
spec = self._manager.get_spec_from_value(self._model.interface, old_value)
if refs is None:
raise ValueError("value not referenced")
if new_value is None:
new_value = old_value
if spec is not None:
self._manager.update_spec_value(spec, new_value, kwargs)
new_value = spec.value
newrefs = []
while refs:
ref = refs.pop()
impl = ref.parent
name = ref.name
refmode = ref.refmode
value = new_value
self._impl_change_ref(impl, name, value, refmode)
newrefs.append(impl.own_refs[name])
self._valid_to_refs.pop(prev_id)
self._valid_to_refs[id(new_value)] = newrefs
@staticmethod
def _impl_change_ref(impl, name, value, *refmode):
if isinstance(impl, ModelImpl):
impl.model.change_ref(name, value)
elif isinstance(impl, UserSpaceImpl):
impl.model.spmgr.change_ref(impl, name, value, refmode)
else:
raise RuntimeError("must not happen")
def __getstate__(self):
return {
"model": self._model,
"manager": self._manager,
"refs": list(self._valid_to_refs.values())
}
def __setstate__(self, state):
self._model = state["model"]
self._manager = state["manager"]
self._valid_to_refs = {
id(refs[0].interface): refs for refs in state["refs"]
}