Source code for digikamdb.albumroots

"""
Provides access to Digikam album roots
"""

import logging
import os
import re
import stat
import sys
from warnings import warn
from typing import Dict, Iterable, Mapping, Optional

from sqlalchemy.orm import relationship, validates

from .table import DigikamTable
from .exceptions import DigikamFileError, DigikamVersionError
from .types import AlbumRootStatus as Status, AlbumRootType as Type


log = logging.getLogger(__name__)


def _albumroot_class(dk: 'Digikam') -> type:                # noqa: F821, C901
    """
    Defines the :class:`~digikamdb._sqla.AlbumRoot` class
    """
    
    class AlbumRoot(dk.base):
        """
        Digikam Album Root
        
        The location can be accessed with :attr:`abspath`.
        
        .. versionchanged:: 0.3.5
            *   ``path in root`` checks if a path is a subdirectory (and thus a
                Digikam album) of the album root. The reverse is checked by
                :meth:`issubdir`.
        
        See also:
            * Class :class:`~digikamdb.albumroots.AlbumRoots`
        """

        __tablename__ = 'AlbumRoots'
        _albums = relationship(
            'Album',
            primaryjoin = 'foreign(Album._albumRoot) == AlbumRoot._id',
            back_populates = '_root',
            lazy = 'dynamic')
        
        # Relationship to Albums
        
        @property
        def albums(self) -> Iterable['Album']:                  # noqa: F821
            """The albums belonging to this root (no setter)"""
            return self._albums
        
        # column properties
        
        @property
        def id(self) -> int:
            """The album root's id (read-only)"""
            return self._id
        
        @property
        def label(self) -> str:
            """The album root's label"""
            return self._label
        
        @label.setter
        def label(self, value):
            self._label = value
        
        @property
        def status(self) -> Status:
            """
            The album root's status (read-only)
            
            This property is dynamically set by Digikam and only valid when
            Digikam is running.
            """
            return Status(self._status)
        
        @property
        def type(self) -> Type:
            """The album root's type (hardwired, removable or network)"""
            return Type(self._type)
        
        @type.setter
        def type(self, value):
            self._type = value
        
        @validates('_status', '_type')
        def _convert_to_int(self, key, value):
            return int(value)
        
        @property
        def identifier(self) -> str:
            """
            The album root's identifier
            
            This often contains the UUID of the file system that containst the
            album root.
            """
            return self._identifier
        
        @identifier.setter
        def identifier(self, value):
            self._identifier = value
        
        @validates('_identifier')
        def _val_identifier(self, key: str, value: str):
            """Deletes cached mountpoint."""
            if hasattr(self, '_mountpoint'):
                delattr(self, '_mountpoint')
            if hasattr(self, '_parsed_identifier_data'):
                delattr(self, '_parsed_identifier_data')
            return value

        @property
        def specificPath(self) -> str:
            """
            The album root's path relative to :attr:`~AlbumRoot.identifier`,
            starting with ``/``
            """
            return self._specificPath
        
        @specificPath.setter
        def specificPath(self, value):
            self._specificPath = value
        
        @property
        def caseSensitivity(self) -> bool:
            """
            Indicates if this album root is case sensitive.
            
            Raises:
                DigikamVersionError:    If DBVersion < 16
            
            .. versionadded:: 0.2.2
            
            .. todo:: Take ``caseSensitivity`` into account when accessing images.
            """
            if sys.platform == 'win32':
                return False
            if self.digikam.db_version < 16:
                return True
            return self._caseSensitivity != 0

        # Other properties and methods
        
        def __contains__(self, path: str) -> bool:
            path = os.path.abspath(path)
            abspath = self._cmppath
            
            if sys.platform == 'win32':
                # case-insensitive compare
                path = path.lower()
                return os.path.commonpath([abspath, path]) == abspath
            
            if self.caseSensitivity:
                return os.path.commonpath([abspath, path]) == abspath
            
            # OS case-sensitive, filesystem case-insensitive:
            # Mountpoint needs exact match, rest case-insensitive
            
            if os.path.commonpath([self.mountpoint, path]) != self.mountpoint:
                # path not inside mountpoint
                return False
            
            path = os.path.join(
                self.mountpoint,
                os.path.relpath([path, self.mountpoint]).lower()
            )
            return os.path.commonpath([abspath, path]) == abspath
        
        def issubdir(self, path: str) -> bool:
            """
            Checks if album root is a subdir of `path`.
            
            .. versionadded:: 0.3.5
            """
            path = os.path.abspath(path)
            abspath = self._cmppath
            
            if sys.platform == 'win32':
                path = path.lower()
                return os.path.commonpath(abspath, path) == path
            
            if self.caseSensitivity:
                return os.path.commonpath([abspath, path]) == path
            
            common = os.path.commonpath([path, self.mountpoint])
            if common == path:
                return True
            if common != self.mountpoint:
                return False
            
            path = os.path.join(
                self.mountpoint,
                os.path.relpath(path, self.mountpoint).lower()
            )
            return os.path.commonpath([abspath, path]) == path
        
        @property
        def _cmppath(self) -> str:
            """:attr:`abspath` converted to lowercase as needed by system"""
            if sys.platform == 'win32':
                return self.abspath.lower()
            
            if not self.caseSensitivity:
                return os.path.join(
                    self.mountpoint,
                    self.specificPath.lstrip('/').lower()
                )
            
            return self.abspath
        
        @property
        def _parsed_identifier(self) -> Dict[str, str]:
            """Returns a parsed version of the identifier"""
            if not hasattr(self, '_parsed_identifier_data'):
                schema, params = self.identifier.split(':?', 1)
                data = {'schema': schema}
                for param in params.split('&'):
                    key, value = param.split('=', 1)
                    data[key] = value
                self._parsed_identifier_data = data
            return self._parsed_identifier_data
        
        @property
        def mountpoint(self) -> str:
            """
            The volume's mount point (read-only)
            
            The result can be modified if ``root_override`` is specified
            in the :class:`Digikam` constructor.
            
            .. versionchanged:: 0.3.4
                * Works if :attr:`identifier` contains multiple parameters.
                * Process ``mountpath`` parameter.
            """
            
            if hasattr(self, '_mountpoint'):
                return self._mountpoint
            
            # Check if we have an override option
            if self.override is not None:
                if 'ids' in self.override:
                    if self.id in self.override['ids']:
                        self._mountpoint = self.override['ids'][self.id]
                        log.debug(
                            'Root override: setting mountpoint of %d to %s',
                            self.id,
                            self._mountpoint
                        )
                        return self._mountpoint
                    if self.identifier in self.override['ids']:
                        self._mountpoint = self.override['ids'][self.identifier]
                        log.debug(
                            'Root override: setting mountpoint for %s to %s',
                            self.identifier,
                            self._mountpoint
                        )
                        return self._mountpoint
            
            if sys.platform != 'linux':
                warn(
                    "Mountpoint detection has only been tested on Linux and will "
                    "probably not work on other operating systems. You can avoid "
                    "related errors by passing a 'root_override' parameter to "
                    "Digikam()",
                    RuntimeWarning
                )
            
            path = None
            
            # Determine path by 
            if 'uuid' in self._parsed_identifier:
                dev = os.path.realpath(os.path.join(
                    '/dev/disk/by-uuid',
                    self._parsed_identifier['uuid']
                ))
                with open('/proc/mounts', 'r') as mt:
                    for line in mt.readlines():
                        mdev, mdir, moptions = line.strip().split(maxsplit=2)
                        
                        if mdev == '/dev/root':
                            mdev = _substitute_device(mdev)
                        
                        if (
                            mdev == 'UUID=' + self._parsed_identifier['uuid'] or
                            mdev == dev
                        ):
                            path = mdir
                            break
            
            if path is None:
                path = self._parsed_identifier.get('path')
            
            if path is None:
                path = self._parsed_identifier.get('mountpath')
            
            if path is not None and os.path.isdir(path):
                self._mountpoint = path
                log.debug(
                    'Setting mountpoint for %s to %s',
                    self.identifier,
                    self._mountpoint
                )
                return path
            
            raise DigikamFileError(
                'No path found for {0}, candidate {1}'.format(self.identifier, path)
            )
                        
        @property
        def abspath(self) -> str:
            """
            The album root's absolute path (read-only)
            
            The result can be modified if ``root_override`` is specified
            in the :class:`Digikam` constructor.
            
            versionchanged:: 0.3.5
                Converted to lowercase for case-insensitive roots (except mountpoint).
            """
            
            override = self.override
            if override is not None:
                if 'paths' in override:
                    if self.id in override['paths']:
                        log.debug('Overriding path')
                        return override['paths'][self.id]
                    path = (self.identifier + self.specificPath).rstrip('/')
                    if path in override['paths']:
                        log.debug('Overriding path')
                        return override['paths'][path]
            
            if self.caseSensitivity:
                relpath = self.specificPath.lstrip('/')
            else:
                relpath = self.specificPath.lstrip('/').lower()
            return os.path.abspath(os.path.join(self.mountpoint, relpath))
        
    return AlbumRoot


[docs] class AlbumRoots(DigikamTable): """ Offers access to the album roots in the Digikam instance. ``AlbumRoots`` represents all album roots present in the Digikam database. It is usually accessed through the :class:`~digikamdb.connection.Digikam` property :attr:`~digikamdb.connection.Digikam.albumroots`. Usage: .. code-block:: python dk = Digikam(...) myroot = dk.albumroots[2] # by id for root in dk.albumroots: # iterate print(root.relativePath) Parameters: digikam: :class:`~digikamdb.conn.Digikam` object override: Dict containing override information (:class:`~digikamdb.conn.Digikam` passes its parameter ``root_override`` here) See also: * Class :class:`~_sqla.AlbumRoot` """ _class_function = _albumroot_class def __init__( self, digikam: 'Digikam', # noqa: F821 override: Optional[Mapping] = None ): super().__init__(digikam) self.Class.override = override if override is not None: log.debug('Root override specified') @classmethod def _get_mountpoints(cls) -> Mapping[str, str]: if hasattr(cls, '_mountpoints'): return cls._mountpoints log.debug('Reading mountpoints') mountpoints = {} with open('/proc/mounts', 'r') as mt: for line in mt.readlines(): dev, dir, fstype, options = line.strip().split(maxsplit=3) # Resolve /dev/root for some installations if dev == '/dev/root': from digikamdb.albumroots import _substitute_device dev = _substitute_device(dev) mountpoints[dir] = dev cls._mountpoints = mountpoints return mountpoints @classmethod def _get_uuids(cls) -> Mapping[str, str]: if hasattr(cls, '_uuids'): return cls._uuids log.debug('Reading disk UUIDs') uuids = {} for f in os.scandir('/dev/disk/by-uuid'): if f.is_symlink(): uuids[os.path.realpath(f.path)] = f.name cls._uuids = uuids return uuids
[docs] def add( self, path: str, label: Optional[str] = None, status: Status = Status.LocationAvailable, type_: Type = Type.UndefinedType, check_dir: bool = True, use_uuid: bool = True ) -> 'AlbumRoot': # noqa: F821 """ Adds a new album root. If check_dir is False, the identifier will be of ``path=`` type, and ``use_uuid`` is ignored. The :attr:`~AlbumRoot.identifier` and :attr:`~AlbumRoot.relativePath` are derifed from ``path``. To add albums and images in the new album root, start Digikam and scand for new objects. Args: path: Path to new album root. label: Label of the new album root. check_dir: Check if the directory exists and is not a subdir of another album root or vice versa. status: The new root's status. use_uuid: Use UUID of filesystem as identifier. If false, the is used as identifier. Returns: The newly created AlbumRoot object. .. note:: The :class:`~digikamdb.conn.Digikam` parameter ``root_override`` is ignored by this method. """ log.debug('Adding album root for dir %s (%s)', path, label) if check_dir: if not os.path.isdir(path): raise DigikamFileError('Directory %s not found' % path) for r in self: log.debug('Checking overlap with root %d (%s)', r.id, r.label) if os.path.commonpath([path, r.abspath]) == r.abspath: raise DigikamFileError( '%s is a subdir of %s (albumroot %s)' % ( path, r.abspath, r.label ) ) if os.path.commonpath([path, r.abspath]) == path: raise DigikamFileError( '%s (albumroot %s) is a subdir of %s' % ( r.abspath, r.label, path ) ) if use_uuid: mountpoints = self._get_mountpoints() uuids = self._get_uuids() mpt = os.path.realpath(path) while True: while not os.path.ismount(mpt): log.debug('%s is not a mountpoint', mpt) mpt = os.path.dirname(mpt) if mpt in mountpoints: dev = mountpoints[mpt] if dev in uuids: ident = 'volumeid:?uuid=' + uuids[dev] spath = '/' + os.path.relpath(path, mpt).rstrip('.') break if mpt == '/': # pragma: no cover raise DigikamFileError('No mountpoint found for ' + path) else: ident = 'volumeid:?path=' + path spath = '/' log.debug('Creating mountpoint with ident=%s and spath=%s', ident, spath) return self._insert( _label = label, _status = Status.LocationAvailable, _type = 1, _identifier = ident, _specificPath = spath )
_device_regex = re.compile(r'(sd[a-z]\d*|nvme\d+n\d+(p\d+)?)') # Substitutes the standard device in /dev for the given device def _substitute_device(dev: str) -> str: dev = os.path.realpath(dev) st1 = os.stat(dev) if not stat.S_ISBLK(st1.st_mode): log.warning('%s is not a block device', dev) return dev with os.scandir('/dev') as sc: for f in sc: if not _device_regex.match(f.name): # log.debug('%s does not match disk regex', f.name) continue st2 = f.stat() if not stat.S_ISBLK(st2.st_mode): log.debug('%s is not a block device', f.path) continue if st1.st_rdev != st2.st_rdev: log.debug( 'Device numbers differ between %s(%d) and %s(%d)', dev, st1.st_rdev, f.path, st2.st_rdev ) continue log.debug('Replacing %s with %s', dev, f.path) return f.path log.warning('No replacement device found for %s', dev) return dev