Source code for mrtinydbutils.tables

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
It implements additional tables for TinyDB.

.. moduleauthor:: Michael Rippstein <michael@anatas.ch>

"""

# -----------------------------------------------------------------------------
# -- Modul importe
# - standart Module
import uuid
from typing import Optional, Mapping, Iterable, List, Union, Callable, cast, Tuple, MutableMapping
from datetime import datetime, timezone

# - zusätzliche Module
from tinydb.table import Table, Document
from tinydb.storages import Storage
from tinydb.queries import Query

# - eigene Module

# -----------------------------------------------------------------------------
__all__ = ('TableUuid', 'TableTimestamp', 'TableUuidTimestamp')


[docs]class TableUuid(Table): """Table with uuid ids. Beschreibung """ def __init__( self, storage: Storage, name: str, cache_size: int = Table.default_query_cache_capacity ) -> None: Table.document_id_class = str # type: ignore[assignment] super().__init__(storage, name, cache_size)
[docs] def _get_next_id(self) -> str: """Return the ID for a newly inserted document. Returns ------- str """ return str(uuid.uuid4())
[docs]class TableTimestamp(Table): """Table with timestamps in every entry.""" created_key: str = '_created' """.""" updated_key: str = '_updated' """."""
[docs] def insert(self, document: MutableMapping) -> int: # type: ignore[override] """Insert a new document into the table. Parameters ---------- document the document to insert Returns ------- int the inserted document's ID """ # Make sure the document implements the ``Mapping`` interface if not isinstance(document, Mapping): raise ValueError('Document is not a Mapping') # First, we get the document ID for the new document if isinstance(document, Document): # For a `Document` object we use the specified ID doc_id = document.doc_id # We also reset the stored next ID so the next insert won't # re-use document IDs by accident when storing an old value self._next_id = None else: # In all other cases we use the next free ID doc_id = self._get_next_id() timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() document[self.created_key] = timestamp document[self.updated_key] = timestamp # Now, we update the table and add the document def updater(table: dict) -> None: assert doc_id not in table, 'doc_id ' + str(doc_id) + ' already exists' # By calling ``dict(document)`` we convert the data we got to a ``dict`` instance even if it was a # different class that implemented the ``Mapping`` interface table[doc_id] = dict(document) # See below for details on ``Table._update`` self._update_table(updater) return doc_id
[docs] def insert_multiple(self, documents: Iterable[MutableMapping]) -> List[int]: # type: ignore[override] """Insert multiple documents into the table. Parameters ---------- documents a Iterable of documents to insert Returns ------- List[int] a list containing the inserted documents' IDs """ doc_ids = [] timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() def updater(table: dict) -> None: for document in documents: # Make sure the document implements the ``Mapping`` interface if not isinstance(document, Mapping): raise ValueError('Document is not a Mapping') # Get the document ID for this document and store it so we can return all document IDs later doc_id = self._get_next_id() doc_ids.append(doc_id) document[self.created_key] = timestamp document[self.updated_key] = timestamp # Convert the document to a ``dict`` (see Table.insert) and store it table[doc_id] = dict(document) # See below for details on ``Table._update`` self._update_table(updater) return doc_ids
[docs] def update( # noqa: C901 self, fields: Union[Mapping, Callable[[Mapping], None]], cond: Optional[Query] = None, doc_ids: Optional[Iterable[int]] = None, ) -> List[int]: """Update all matching documents to have a given set of fields. Parameters ---------- fields the fields that the matching documents will have or a method that will update the documents cond which documents to update doc_ids a list of document IDs Returns ------- List[int] a list containing the updated document's ID """ timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() # Define the function that will perform the update if callable(fields): def perform_update(table: dict, doc_id: int) -> None: # Update documents by calling the update function provided by the user created_at = table[doc_id][self.created_key] fields(table[doc_id]) # type: ignore[operator] table[doc_id].update({self.created_key: created_at, self.updated_key: timestamp}) else: def perform_update(table: dict, doc_id: int) -> None: # Update documents by setting all fields from the provided data created_at = table[doc_id][self.created_key] table[doc_id].update(fields) table[doc_id].update({self.created_key: created_at, self.updated_key: timestamp}) if doc_ids is not None: # Perform the update operation for documents specified by a list of document IDs updated_ids = list(doc_ids) def updater(table: dict) -> None: # Call the processing callback with all document IDs for doc_id in updated_ids: perform_update(table, doc_id) # Perform the update operation (see _update_table for details) self._update_table(updater) return updated_ids if cond is not None: # Perform the update operation for documents specified by a query # Collect affected doc_ids updated_ids = [] def updater(table: dict) -> None: # pylint: disable=function-redefined _cond = cast('Query', cond) # We need to convert the keys iterator to a list because we may remove entries from the # ``table`` dict during iteration and doing this without the list conversion would result in an # exception (RuntimeError: dictionary changed size during iteration) for doc_id in list(table.keys()): # Pass through all documents to find documents matching the # query. Call the processing callback with the document ID if _cond(table[doc_id]): # Add ID to list of updated documents updated_ids.append(doc_id) # Perform the update (see above) perform_update(table, doc_id) # Perform the update operation (see _update_table for details) self._update_table(updater) return updated_ids # Update all documents unconditionally updated_ids = [] def updater(table: dict) -> None: # type: ignore[no-redef] # pylint: disable=function-redefined # Process all documents for doc_id in list(table.keys()): # Add ID to list of updated documents updated_ids.append(doc_id) # Perform the update (see above) perform_update(table, doc_id) # Perform the update operation (see _update_table for details) self._update_table(updater) return updated_ids
[docs] def update_multiple( self, updates: Iterable[ Tuple[Union[Mapping, Callable[[Mapping], None]], Query] ], ) -> List[int]: """Update all matching documents to have a given set of fields. Parameters ---------- updates TODO .. todo:: Parameter description Returns ------- List[int] a list containing the updated document's ID """ timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() # Define the function that will perform the update def perform_update( fields: Union[Mapping, Callable[[Mapping], None]], table: dict, doc_id: int) -> None: if callable(fields): # Update documents by calling the update function provided by the user created_at = table[doc_id][self.created_key] fields(table[doc_id]) table[doc_id].update({self.created_key: created_at, self.updated_key: timestamp}) else: # Update documents by setting all fields from the provided # data created_at = table[doc_id][self.created_key] table[doc_id].update(fields) table[doc_id].update({self.created_key: created_at, self.updated_key: timestamp}) # Perform the update operation for documents specified by a query # Collect affected doc_ids updated_ids = [] def updater(table: dict) -> None: # We need to convert the keys iterator to a list because we may remove entries from the ``table`` # dict during iteration and doing this without the list conversion would result in an exception # (RuntimeError: dictionary changed size during iteration) for doc_id in list(table.keys()): for fields, cond in updates: _cond = cast('Query', cond) # Pass through all documents to find documents matching the # query. Call the processing callback with the document ID if _cond(table[doc_id]): # Add ID to list of updated documents updated_ids.append(doc_id) # Perform the update (see above) perform_update(fields, table, doc_id) # Perform the update operation (see _update_table for details) self._update_table(updater) return updated_ids
[docs]class TableUuidTimestamp(TableUuid, TableTimestamp): """A table class with timestamps and UUID as ``doc_id``."""