Source code for digikamdb.tags

"""
Enables access to Digikam tags.
"""

import logging
from typing import Iterable, List, Optional, Union

from sqlalchemy import (
    Column, Integer, ForeignKey, String, Table,
    case, event, inspect, select, text,
)
from sqlalchemy.orm import object_session, relationship, validates
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound

from .table import DigikamTable
from .properties import BasicProperties
from .exceptions import (
    DigikamError,
    DigikamAssignmentError,
    DigikamObjectNotFoundError,
    DigikamMultipleObjectsFoundError,
    DigikamDataIntegrityError
)


log = logging.getLogger(__name__)


def _tag_class(dk: 'Digikam') -> type:                      # noqa: F821, C901
    """
    Defines the Tag class
    """
    
    class Tag(dk.base):
        """
        Digikam tag.
        
        Tags in Digikam are hierarchical. ``Tag`` reflects this by providing:
        
        *   The :attr:`parent` and :attr:`children` properties,
        *   ``tag1 in tag2`` can be used to test if ``tag2`` is an ancestor of
            ``tag1``.
        *   The :meth:`hierarchicalname` method.
        
        .. note::
            The tree structure differs between SQLite and MySQL:
            
            *   On SQLite, tags at the top level have a ``pid == 0``,
                and there is no row with ``id == 0``. There can be many
                tags without a parent tag.
            *   On MySQL, there is a tag ``_Digikam_Root_Tag_`` with
                ``id == 0`` and ``pid == -1``, and there is no tag with
                ``id == -1``. All other tags are descendents of
                ``_Digikam_Root_Tag_``.
            *   On MySQL with DBVersion <= 10, the ``Tags`` table implements an
                additional *nested sets* structure with columns ``lft`` and ``rgt``.

        .. seealso::
            * Class :class:`~digikamdb.tags.Tags`
        """
        
        __tablename__ = 'Tags'
        __mapper_args__ = dk.base.__mapper_args__.copy()
        __mapper_args__.update({
            "batch": False  # allows extension to fire for each
                            # instance before going to the next.
        })
        _properties = relationship(
            'TagProperty',
            primaryjoin = 'foreign(TagProperty._tagid) == Tag._id',
            lazy = 'dynamic'
        )
        _iconObj = relationship(
            'Image',
            primaryjoin = 'foreign(Image._id) == Tag._icon',
            viewonly = True,
            uselist = False
        )
        _images = relationship(
            'Image',
            primaryjoin = 'foreign(ImageTags.c.tagid) == Tag._id',
            secondaryjoin = 'foreign(ImageTags.c.imageid) == Image._id',
            secondary = 'ImageTags',
            back_populates = '_tags',
            lazy = 'dynamic'
        )
        
        # Needed to determine if root tag exists
        _is_mysql = dk.is_mysql
        
        #: Needed for nested sets operations
        _has_nested_sets = dk.has_tags_nested_sets
        
        #: Needed for Tagstree operations
        _session = dk.session
        
        @validates('_pid')
        def _check_pid(self, key: str, value: int) -> int:
            if value < 0:
                raise DigikamAssignmentError('Tag parent id cannot be negative')
            return value
        
        # Special functions
        
        def __repr__(self) -> str:
            return '<Digikam Tag (%s, %d, %d)>' % (self.name, self.id, self.pid)
        
        def __contains__(self, obj: 'Tag') -> bool:
            if not isinstance(obj, Tag):
                raise TypeError('A tag can only contain other tags')
            if self._has_nested_sets:
                return self._lft < obj._lft and self._rgt > obj._rgt
            else:
                return self._session.scalars(
                    select(self.digikam.tags.TagsTreeEntry).filter_by(
                        _id = obj.id,
                        _pid = self.id
                    )
                ).one_or_none() is not None
                # return self. id in obj._ancestors
        
        @property
        def id(self) -> int:
            """The tag's id (read-only)"""
            return self._id
        
        @property
        def pid(self) -> int:
            """The parent tag's id (read-only)"""
            return self._pid
        
        @property
        def name(self) -> str:
            """The tag's name"""
            return self._name
        
        @name.setter
        def name(self, value: str):
            self._name = value
        
        @property
        def icon(self) -> Union['Image', str, None]:        # noqa: F821
            """
            Returns the tag's icon.
            
            Possible types are:
            
            :`~_sqla.Image`:class:: The icon is an image from the Digikam
                                    collection. When setting, you can also
                                    specify the image's id.
            :`str`:class::          The icon is a KDE icon string
            :``None``:              No icon is set
            """
            if self._icon is not None:
                return self._iconObj
            return self._iconkde
        
        @icon.setter
        def icon(self, value: Union['Image', str, int, None]):  # noqa: F821
            if isinstance(value, self.digikam.images.Image):
                value = value.id
            if value is None:
                self._icon = None
                self._iconkde = None
            elif isinstance(value, int):
                self._icon = value
                self._iconkde = None
            elif isinstance(value, str):
                self._icon = None
                self._iconkde = value
            else:
                raise DigikamAssignmentError('Tag.icon must be Image, str, or None')
        
        @property
        def images(self) -> Iterable['Image']:                  # noqa: F821
            """Images belonging to the tag (no setter)"""
            return self._images
        
        @property
        def _ancestors(self) -> List:
            """
            Returns the ancestors of a tag.
            
            Returns:
                In SQLite, an unsorted list with the ancestor's ids.
                In MySQL, a sorted list (top-down) with the ancestor objects.
            """
            log.debug('Getting ancestors for tag %d', self.id)
            
            if self._has_nested_sets:
                # MySQL
                return self._session.scalars(
                    select(Tag)
                    .where(Tag._lft < self._lft, Tag._rgt > self._rgt)
                    .order_by(Tag._lft)
                ).all()
            
            # We're on a newer DBVersion or on SQLite
            if self._is_mysql:
                min_pid = 0
            else:
                min_pid = 1
            return [
                row._pid
                for row in self._session.scalars(
                    select(self.digikam.tags.TagsTreeEntry).filter_by(_id = self.id)
                )
                if row._pid >= min_pid
            ]
            
        # Other properties and methods
        
        # Since Digikam doesn't use a foreign key, this is a regular property.
        @property
        def parent(self) -> Optional['Tag']:                # noqa: F821
            """
            Returns the tag's parent object.
            
            Returns ``None`` for
            
            * the root tag on MySQL or
            * tags at top level on SQLite
            
            .. todo:: Support moving tags
            """

            # Tags without a parent
            if self._is_mysql:
                if self.pid < 0:
                    return None
            else:
                if self.pid <= 0:
                    return None

            return self.digikam.tags._select(_id = self.pid).one()
        
        @property
        def children(self) -> Iterable['Tag']:              # noqa: F821
            """
            Returns the tag's children.
            """
            return self.digikam.tags._select(_pid = self.id)
        
        @property
        def properties(self) -> TagProperties:
            """
            Returns the tag's properties
            """
            if not hasattr(self, '_propertiesObj'):
                self._propertiesObj = TagProperties(self)
            return self._propertiesObj
        
        def hierarchicalname(self) -> str:
            """
            Returns the name including parents, separated by ``/``.
            """
            
            # Parent is the root tag or does not exist
            if self.pid <= 0:
                return self.name
            
            # No hierarchical name for internal tags:
            if 'internalTag' in self.properties:
                return self.name
            
            if self._has_nested_sets:
                return '/'.join(
                    [t.name for t in self._ancestors if t.id > 0]
                ) + '/' + self.name
            
            return self.parent.hierarchicalname() + '/' + self.name
        
        def _check(self):
            """
            Checks if the tree is consistent.
            
            Raises:
                DigikamDataIntegrityError
            """
            
            p = self.parent
            if p and self not in p:                         # pragma: no cover
                raise DigikamDataIntegrityError(
                    'Tag table: Tag id=%d is not in children of parent (%d)' % (
                        self.id, p.id
                    )
                )
            while p:
                if not isinstance(p, Tag):                  # pragma: no cover
                    raise DigikamError(
                        'Tag parent is of class %s, not Tag' % p.__class__.__name__
                    )
                if self not in p:                           # pragma: no cover
                    raise DigikamDataIntegrityError(
                        'Tag table inconsistent: Tag id=%d is not in descendents ' +
                        'of ancestor %d' % (self.id, p.id)  # noqa: F507
                    )
                if p.id == self.id:                         # pragma: no cover
                    raise DigikamDataIntegrityError(
                        'Tag table inconsistent: Circular ancestry in id=%d' % self.id
                    )
                p = p.parent
        
        def _check_nested_sets(self):
            """
            Checks if the tree is consistent.
            
            Raises:
                DigikamDataIntegrityError
            """
            d = self._rgt - self._lft
            if d <= 0 or d % 2 == 0:                        # pragma: no cover
                raise DigikamDataIntegrityError(
                    'Tag table: inconsistent lft, rgt in id=%d (%d,%d)' % (
                        self.id, self._lft, self._rgt
                    )
                )
            pos = self._lft
            for ch in self.children.order_by(self.__class__._lft):
                if not (
                    self._lft < ch._lft and self._rgt > ch._rgt
                ):                                          # pragma: no cover
                    raise DigikamDataIntegrityError(
                        'Tag table inconsistent: parent %d (%d,%d), child %d (%d,%d)' % (
                            self.id, self._lft, self._rgt, ch.id, ch._lft, ch._rgt
                        )
                    )
                for ch2 in self.children:
                    if ch == ch2:
                        continue
                    if ch._rgt < ch2._lft or ch._lft > ch2._rgt:
                        continue
                    raise DigikamDataIntegrityError(        # pragma: no cover
                        'Tag table has ' +
                        'overlapping siblings %d (%d,%d), %d (%d,%d)' % (
                            ch.id, ch._lft, ch._rgt, ch2.id, ch2._lft, ch2._rgt
                        )
                    )
                if ch._lft > pos + 1:                       # pragma: no cover
                    raise DigikamDataIntegrityError(
                        'Tag table inconsistent: gap before %d (%d), last pos %d' % (
                            ch.id, ch._lft, pos
                        )
                    )
                pos = ch._rgt
                ch._check_nested_sets()
    
    return Tag


[docs] class Tags(DigikamTable): """ Offers access to the tags in the Digikam instance. ``Tags`` represents all tags present in the Digikam database. It is usually accessed through the :class:`~digikamdb.connection.Digikam` property :attr:`~digikamdb.connection.Digikam.tags`. Basic usage: .. code-block:: python dk = Digikam(...) mytag = dk.tags['My Tag'] # access by name mytag2 = dk.tags['parent/child'] # access by hierarchical name mytag3 = dk.tags[42] # access by id newtag = dk.tags.add('New Tag', 0) # creates new tag with name 'New Tag' Access via ``[]`` raises an exception if the name or id cannot be found, or if there are multiple matches. Parameters: digikam: Digikam object for access to database and other classes. Raises: DigikamObjectNotFoundError: No matching tag was found. DigikamMultipleObjectsFoundError: Multiple tags where found when one was expected. See also: * Class :class:`~_sqla.Tag` """ _class_function = _tag_class def __init__( self, digikam: 'Digikam', # noqa: F821 ): super().__init__(digikam) self._define_helper_tables() def _define_helper_tables(self): """Defines the classes for helper tables.""" class TagProperty(self.digikam.base): """ Tag Properties This table should be accessed via Class :class:`~digikamdb.tags.TagProperties`. """ __tablename__ = 'TagProperties' _tagid = Column('tagid', Integer, primary_key = True) _property = Column('property', String, primary_key = True) self.TagProperty = TagProperty if not self.digikam.has_tags_nested_sets: class TagsTreeEntry(self.digikam.base): """ Class for the tags tree This is a view on MySQL with DBVersion <= 10. .. versionchanged:: 0.2.2 Also defined for MySQL. """ __tablename__ = 'TagsTree' _id = Column('id', Integer, primary_key = True) _pid = Column('pid', Integer, primary_key = True) self.TagsTreeEntry = TagsTreeEntry def _before_insert( self, mapper: 'Mapper', # noqa: F821 connection: 'Connection', # noqa: F821 instance: 'Tag' # noqa: F821 ): """ Adjusts the lft and rgt columns on insert. """ if not self._do_before_insert: # pragma: no cover return if instance.pid < 0: # pragma: no cover # This will never happen, but we keep it for safety raise DigikamAssignmentError('Parent must be >= 0') log.debug('Reordering nested sets for tags before insert') tags = mapper.persist_selectable if instance.pid == 0: new_position = connection.scalar( select(tags.c.lft).where( tags.c.name == '_Digikam_Internal_Tags_')) else: new_position = connection.scalar( select(tags.c.rgt).where( tags.c.id == instance.pid)) connection.execute( tags.update().where(tags.c.rgt >= new_position).values( lft = case( ( tags.c.lft >= new_position, tags.c.lft + 2, ), else_ = tags.c.lft ), rgt = tags.c.rgt + 2, ) ) instance._lft = new_position instance._rgt = new_position + 1 # before_update() would be needed to support moving of nodes def _before_update( self, mapper: 'Mapper', # noqa: F821 connection: 'Connection', # noqa: F821 instance: 'Tag' # noqa: F821 ): if not self._do_before_update: # pragma: no cover return log.debug('Reordering nested sets for tags before update') attrs = inspect(instance).attrs if ( attrs._pid.history.has_changes() or attrs._lft.history.has_changes() or attrs._rgt.history.has_changes() ): raise NotImplementedError('Moving tags is not implemented') # after_delete() would be needed to support removal of nodes. def _after_delete( self, mapper: 'Mapper', # noqa: F821 connection: 'Connection', # noqa: F821 instance: 'Tag' # noqa: F821 ): """ Adjusts the lft and rgt columns on delete. """ if not self._do_after_delete: # pragma: no cover return if instance._rgt - instance._lft > 1: # pragma: no cover raise DigikamError('Cannot delete tag with sub-tags') log.debug('Reordering nested sets for tags after delete') tags = mapper.persist_selectable right = instance._rgt connection.execute( tags.update().where(tags.c.rgt > right).values( lft = case( ( tags.c.lft > right, tags.c.lft - 2, ), else_ = tags.c.lft ), rgt = tags.c.rgt - 2 ) )
[docs] def setup(self): """ Sets the event listeners for nested sets. Called by Digikam constructor. """ self._do_before_insert = False self._do_before_update = False self._do_after_delete = False self._has_nested_sets = False if self.digikam.has_tags_nested_sets: self._do_before_insert = True self._do_before_update = True self._do_after_delete = True event.listen(self.Class, 'before_insert', self._before_insert) event.listen(self.Class, 'before_update', self._before_update) event.listen(self.Class, 'after_delete', self._after_delete)
def __getitem__(self, key): if isinstance(key, str): if '/' in key: name = key.split('/')[-1] else: name = key q = self._select(_name = name) num = q.count() if num == 0: raise DigikamObjectNotFoundError('No Tag for name=' + key) if num == 1: tag = q.one() if '/' not in key: return tag if tag.hierarchicalname() == key: return tag raise DigikamObjectNotFoundError('No Tag for name=' + key) for tag in q: if tag.hierarchicalname() == key: return tag if '/' in key: raise DigikamObjectNotFoundError('No Tag for name=' + key) else: raise DigikamMultipleObjectsFoundError('Multiple tags for name=' + key) return super().__getitem__(key) @property def _root(self) -> 'Tag': # noqa: F821 """ Returns the root tag when on MySQL. Raises: DigikamObjectNotFoundError: When called on SQLite. """ try: return self._select(_pid = -1).one() except NoResultFound: raise DigikamObjectNotFoundError('No tag for pid=-1')
[docs] def add( self, name: str, parent: Union[int, 'Tag'], # noqa: F821 icon: Optional[Union['Image', str, int]] = None, # noqa: F821 ) -> 'Tag': # noqa: F821 """ Adds a new tag. To create a Tag at the root of the tag tree, set ``parent`` to 0. ``name`` can be a hierarchical name (e.g. "locations/cities/Amsterdam"), in which case the intermediate tags ("locations" and "cities" in the given example) are created too, without setting an icon if one is specified. Parameters: name: The new tag's name parent: The new tag's parent as an id or a Tag object icon: The new tag's icon. If given as an Image, the icon is set to this Image from the Digikam collection. If given as an int, the icon is set to the image with the id **icon**. If given as a str, the icon is set to the corresponding KDE icon. Returns: The newly created tag object. .. versionchange:: 0.3.1 Accepts hierarchical tag names. """ if isinstance(parent, self.Class): pid = parent.id elif isinstance(parent, int): pid = parent else: raise TypeError( 'Parent must be int or Tag, not {}'.format(parent.__class__.__name__) ) if '/' in name: names = name.split('/') name = names.pop() if pid > 0 and isinstance(parent, int): parent = self[parent] pname = parent.abspath + '/' + '/'.join(names) else: pname = '/'.join(names) try: ptag = self[pname] except DigikamObjectNotFoundError: ptag = self.add(pname, pid) pid = ptag.id options = {} if icon is not None: if isinstance(icon, self.digikam.images.Class): options['icon'] = icon.id elif isinstance(icon, int): options['icon'] = icon elif isinstance(icon, str): options['iconkde'] = icon else: raise TypeError('Icon must be int, str or Image') return self._insert(name = name, pid = pid, **options)
[docs] def remove(self, tag: Union[int, 'Tag']): # noqa: F821 """ Removes a tag. Parameters: tag: the tag to delete. Can be a :class:`Tag` object or an id. """ if isinstance(tag, int): tag = self[tag] elif not isinstance(tag, self.Class): raise TypeError('Tag must be int or ' + self.Class.__name__) for p in tag.properties._select_self().all(): self._session.delete(p) self._session.delete(tag)
[docs] def check(self): """ Checks the integrity of the *Tags* table. Checks that * each tag is among the children of its parent * each tag is contained in its ancestors * there are no circular parent-child relations * the nested sets and adjacency list structures are consistent (MySQL with DBVersion <= 10 only) Raises: DigikamDataIntegrityError: Table is in an inconsistent state. .. versionchanged:: 0.2.2 Do not check nested sets for DBVersion > 10 """ for tag in self: tag._check() if self.digikam.has_tags_nested_sets: self._root._check_nested_sets()
def _tagproperty_class(dk: 'Digikam') -> type: # noqa: F821 """Defines the TagProperty class.""" return dk.tags.TagProperty
[docs] class TagProperties(BasicProperties): """ Tag Properties Args: digikam(Digikam): Digikam object. parent(Tag): The corresponding ``Tag`` object. """ # Funktion returning the table class _class_function = _tagproperty_class # Parent id column _parent_id_col = '_tagid' # Key column _key_col = '_property' # Value column _value_col = '_value' _tagid = Column('tagid', Integer, primary_key = True) _property = Column('tagid', String, primary_key = True)