Source code for

# Copyright (c) 2017-2022 Fumito Hamamura <>

# This library is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation version 3.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library.  If not, see <>.

import re
import string
import itertools
import pathlib
import openpyxl as opxl
import openpyxl.cell
from import Mapping
from .baseio import BaseIOSpec, BaseSharedIO

def _get_col_index(name):
    """Convert column name to index."""

    index = string.ascii_uppercase.index
    col = 0
    for c in name.upper():
        col = col * 26 + index(c) + 1
    return col

def _is_range_address(range_addr):

    # RANGE_EXPR modified from openpyxl.utils.cells
    # see

    RANGE_EXPR = """
    RANGE_EXPR_RE = re.compile(RANGE_EXPR, re.VERBOSE)

    match = RANGE_EXPR_RE.match(range_addr)

    if not match:
        return False
        cells ="cells")

    if not cells:
        return False
        min_col = _get_col_index("min_col"))
        min_row = int("min_row"))

        # if range_addr is for a single cell,
        # max_col and max_row are None.
        max_col ="max_col")
        max_col = max_col and _get_col_index(max_col)

        max_row ="max_row")
        max_row = max_row and int(max_row)

        if max_col and max_row:
            return (
                (min_col <= max_col)
                and (min_row <= max_row)
                and (max_col <= 16384)
                and (max_row <= 1048576)
            return (min_col <= 16384) and (min_row <= 1048576)

def _get_range(book, range_, sheet):
    """Return a range as nested dict of openpyxl cells."""

    filename = None
    if isinstance(book, str):
        filename = book
        book = opxl.load_workbook(book, data_only=True)
    elif isinstance(book, opxl.Workbook):
        raise TypeError

    if _is_range_address(range_):
        sheet_names = [name.upper() for name in book.sheetnames]
        index = sheet_names.index(sheet.upper())
        data = book.worksheets[index][range_]
        data = _get_namedrange(book, range_, sheet)
        if data is None:
            raise ValueError(
                "Named range '%s' not found in %s" % (range_, filename or book)

    return data

def _get_namedrange(book, rangename, sheetname=None):
    """Get range from a workbook.

    A workbook can contain multiple definitions for a single name,
    as a name can be defined for the entire book or for
    a particular sheet.

    If sheet is None, the book-wide def is searched,
    otherwise sheet-local def is looked up.

        book: An openpyxl workbook object.
        rangename (str): Range expression, such as "A1", "$G4:$K10",
            named range "NamedRange1".
        sheetname (str, optional): None for book-wide name def,
            sheet name for sheet-local named range.

        Range object specified by the name.


    def cond(namedef):

        if namedef.type.upper() == "RANGE":
            if == rangename.upper():

                if sheetname is None:
                    if not namedef.localSheetId:
                        return True

                else:  # sheet local name
                    sheet_id = [sht.upper() for sht in book.sheetnames].index(

                    if namedef.localSheetId == sheet_id:
                        return True

        return False

    def get_destinations(name_def):
        """Workaround for the bug in DefinedName.destinations"""

        from openpyxl.formula import Tokenizer
        from openpyxl.utils.cell import SHEETRANGE_RE

        if name_def.type == "RANGE":
            tok = Tokenizer("=" + name_def.value)
            for part in tok.items:
                if part.subtype == "RANGE":
                    m = SHEETRANGE_RE.match(part.value)
                        sheet_name ="quoted")
                        sheet_name ="notquoted")

                    yield sheet_name,"cells")

    namedef = next(
        (item for item in book.defined_names.definedName if cond(item)), None

    if namedef is None:
        return None

    dests = get_destinations(namedef)
    xlranges = []

    sheetnames_upper = [name.upper() for name in book.sheetnames]

    for sht, addr in dests:
        if sheetname:
            sht = sheetname
        index = sheetnames_upper.index(sht.upper())

    if len(xlranges) == 1:
        return xlranges[0]
        return xlranges

def _redirect_merged(cells):

    if isinstance(cells, openpyxl.cell.Cell):
        return cells
    elif isinstance(cells, openpyxl.cell.MergedCell):
        merged_cells = cells.parent.merged_cells.ranges
        range_ = next((r for r in merged_cells
                       if (r.min_row <= cells.row <= r.max_row)
                       and (r.min_col <= cells.column <= r.max_col)))
        return cells.parent.cell(range_.min_row, range_.min_col)

class ExcelWorkbook(BaseSharedIO):

    def __init__(self, path, manager, load_from):
        super().__init__(path, manager, load_from=load_from) = opxl.load_workbook(load_from, data_only=True)

    def _on_write(self, path):

    def _on_update_value(self, value, kwargs):

    def get_range(self, range_, sheet):
        return _get_range(, range_, sheet)

    def __getstate__(self):
        state = super().__getstate__()
        state["book"] =
        return state

    def __setstate__(self, state):
        super().__setstate__(state) = state["book"]

class _RangeType:

    CELL = 1
    ROW = 2
    COL = COLUMN = 3
    TABLE = 4

[docs]class ExcelRange(BaseIOSpec, Mapping): """Mapping class for accessing Excel ranges An ExcelRange is a dict-like object that represents a range in an Excel file. The user can read values from the range or write values to it by the subscription operator ``[]``. ExcelRange is a mapping class, thus it implements all the mapping methods and operations. ExcelRange objects can only be created by the :meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>` or :meth:`UserSpace.new_excel_range<>` method. :class:`ExcelRange` is a subclass of the :class:`` abstract class. The :attr:`~modelx.core.model.Model.iospecs` property list all the :class:`` instances held in the Model including :class:`ExcelRange` objects. See Also: * :meth:`UserSpace.new_excel_range<>` * :meth:`Model.new_excel_range<modelx.core.model.Model.new_excel_range>` * :attr:`~modelx.core.model.Model.iospecs` .. versionadded:: 0.9.0 """ io_class = ExcelWorkbook def __init__(self, range_, sheet=None, keyids=None): """ Args: path: Path to the Excel file for saving data. If a relative path is given, it is relative to the model folder. keyids(optional): sequence of strings to specify rows and columns to be interpreted as keys. E.g. ``["r0", "c1"]`` means keys are pairs of values taken from the 1st row and the second column in the ``range_``. loadpath(optional): Absolute path to the Excel file to be read in. Defaults to ``path``. """ BaseIOSpec.__init__(self) self.range = range_ self.sheet = sheet self.keyids = tuple(keyids) if keyids else None def _on_load_value(self): self._load_cells(self.keyids) def _can_update_value(self, value, kwargs): return False def _on_pickle(self, state): state.update({ "range": self.range, "sheet": self.sheet, "keyids": self.keyids, "_cells": self._cells, "_datasize": self._datasize, "_key_to_index": self._key_to_index, "_keysize": self._keysize }) return state def _on_unpickle(self, state): self.range = state["range"] self.sheet = state["sheet"] self.keyids = state["keyids"] self._cells = state["_cells"] self._datasize = state["_datasize"] self._key_to_index = state["_key_to_index"] self._keysize = state["_keysize"] def _on_serialize(self, state): state.update({ "range": self.range, "sheet": self.sheet, "keyids": self.keyids }) return state def _on_unserialize(self, state): self.range = state["range"] self.sheet = state["sheet"] self.keyids = state["keyids"] self._load_cells(self.keyids) def _load_cells(self, keys): self._cells = self._io.get_range(self.range, self.sheet) self._datasize = (len(self._cells), len(self._cells[0])) self._key_to_index = self._create_key_to_index(keys) def _can_add_other(self, other): """Check if self and other have no overlapping cells False if: self and other on the same sheet AND overlapping rows AND overlapping cols """ if self._cells[0][0].parent != other._cells[0][0].parent: return True elif self._cells[-1][0].row < other._cells[0][0].row: return True elif other._cells[-1][0].row < self._cells[0][0].row: return True elif self._cells[0][-1].column < other._cells[0][0].column: return True elif other._cells[0][-1].column < self._cells[0][0].column: return True else: return False def _create_key_to_index(self, keyarg): """Initializes self._size, self._keysize and returns a value for self._key_to_index""" if keyarg is None: keyarg = [] keys = [(p[:1].lower(), int(p[1:])) for p in keyarg] key_rows = [] key_cols = [] for key in keys: if key[0] == "r": if key[1] < self._datasize[0]: key_rows.append(key[1]) else: raise ValueError("invalid row index: %s" % key[1]) elif key[0] == "c": if key[1] < self._datasize[1]: key_cols.append(key[1]) else: raise ValueError("invalid column index: %s" % key[1]) else: raise ValueError("invalid params: %s" % keyarg) for r in key_rows: if r < 0 or r >= self._datasize[0]: raise ValueError("invalid key row: %s" % r) for c in key_cols: if c < 0 or c >= self._datasize[0]: raise ValueError("invalid key columns: %s" % c) self._size = (self._datasize[0] - len(set(key_rows)), self._datasize[1] - len(set(key_cols))) rkeys_to_col = {} if key_rows: for c in range(len(self._cells[0])): if c not in key_cols: key = tuple(_redirect_merged(self._cells[r][c]).value for r in key_rows) if key in rkeys_to_col: raise ValueError( "duplicate row key: %s" % repr(key)) rkeys_to_col[key] = c ckeys_to_row = {} if key_cols: for r in range(len(self._cells)): if r not in key_rows: key = tuple(_redirect_merged(self._cells[r][c]).value for c in key_cols) if key in ckeys_to_row: raise ValueError( "duplicate column key: %s" % repr(key)) ckeys_to_row[key] = r rkind = [i for i, k in enumerate(keys) if k[0] == "r"] ckind = [i for i, k in enumerate(keys) if k[0] == "c"] result = {} if key_rows and key_cols: for rks, cks in itertools.product(rkeys_to_col, ckeys_to_row): key = [None] * len(keys) for i, k in enumerate(rks): key[rkind[i]] = k for i, k in enumerate(cks): key[ckind[i]] = k self._keysize = len(key_rows) + len(key_cols) result[tuple(key)] = (ckeys_to_row[cks], rkeys_to_col[rks]) elif key_rows or key_cols: if key_rows: if self._size[0] > 1: self._keysize = len(key_rows) + 1 for rkey, c in rkeys_to_col.items(): i = 0 for r in range(self._datasize[0]): if r not in key_rows: result[rkey + (i,)] = (r, c) i += 1 else: self._keysize = len(key_rows) r = next(i for i in range(self._datasize[0]) if i not in key_rows) for rkey, c in rkeys_to_col.items(): result[rkey] = (r, c) else: # key_cols if self._size[1] > 1: self._keysize = len(key_cols) + 1 for ckey, r in ckeys_to_row.items(): i = 0 for c in range(self._datasize[1]): if c not in key_cols: result[ckey + (i,)] = (r, c) i += 1 else: self._keysize = len(key_cols) c = next(i for i in range(self._datasize[1]) if i not in key_cols) for ckey, r in ckeys_to_row.items(): result[ckey] = (r, c) else: # No key rows and columns if self._datasize[0] > 1 and self._datasize[1] > 1: self._keysize = 2 elif self._datasize[0] > 1 and self._datasize[1] == 1: self._keysize = 1 elif self._datasize[0] == 1 and self._datasize[1] > 1: self._keysize = 1 elif len(self) == 1: self._keysize = 0 else: raise RuntimeError("must not happen") result = None return result def _get_index(self, key): if self._key_to_index: if self._keysize == 1: key = (key,) return self._key_to_index[key] elif self._datasize[0] > 1 and self._datasize[1] > 1: return key elif self._datasize[0] > 1 and self._datasize[1] == 1: return key, 0 elif self._datasize[0] == 1 and self._datasize[1] > 1: return 0, key elif len(self) == 1: if not bool(key): return 0, 0 else: KeyError("invalid key: %s" % repr(key)) else: raise KeyError("invalid key: %s" % repr(key)) @property def value(self): return self def __getitem__(self, key): r, c = self._get_index(key) return _redirect_merged(self._cells[r][c]).value def __setitem__(self, key, value): r, c = self._get_index(key) _redirect_merged(self._cells[r][c]).value = value def __len__(self): return self._size[0] * self._size[1] def __iter__(self): if self._key_to_index: for k in self._key_to_index: if self._keysize == 1: yield k[0] else: yield k elif self._datasize[0] == 1 and self._datasize[1] == 1: yield () else: if self._datasize[0] > 1 and self._datasize[1] > 1: keygen = itertools.product( range(self._datasize[0]), range(self._datasize[1])) elif self._datasize[0] > 1 and self._datasize[1] == 1: keygen= range(self._datasize[0]) elif self._datasize[0] == 1 and self._datasize[1] > 1: keygen = range(self._datasize[1]) else: raise RuntimeError("must not happen") for key in keygen: yield key def __repr__(self): return ( "<ExcelRange " + "path=%s " + "range=%s " + "sheet=%s>" ) % (repr(str(self._io.path.as_posix())), repr(self.range), repr(self.sheet)) def _get_attrdict(self, extattrs=None, recursive=True): result = super()._get_attrdict(extattrs=extattrs, recursive=recursive) result.update({ "range": self.range, "sheet": self.sheet, "keyids": self.keyids}) return result