Source code for secretstorage.collection

# SecretStorage module for Python
# Access passwords using the SecretService DBus API
# Author: Dmitry Shachnev, 2013-2022
# License: 3-clause BSD, see LICENSE file

"""Collection is a place where secret items are stored. Normally, only
the default collection should be used, but this module allows to use any
registered collection. Use :func:`get_default_collection` to get the
default collection (and create it, if necessary).

Collections are usually automatically unlocked when user logs in, but
collections can also be locked and unlocked using
:meth:`Collection.lock` and :meth:`Collection.unlock` methods (unlocking
requires showing the unlocking prompt to user and can be synchronous or
asynchronous). Creating new items and editing existing ones is possible
only in unlocked collection."""

from typing import Dict, Iterator, Optional
from jeepney.io.blocking import DBusConnection
from secretstorage.defines import SS_PREFIX, SS_PATH
from secretstorage.dhcrypto import Session
from secretstorage.exceptions import LockedException, ItemNotFoundException, \
 PromptDismissedException
from secretstorage.item import Item
from secretstorage.util import DBusAddressWrapper, exec_prompt, \
 format_secret, open_session, unlock_objects

COLLECTION_IFACE = SS_PREFIX + 'Collection'
SERVICE_IFACE = SS_PREFIX + 'Service'
DEFAULT_COLLECTION = '/org/freedesktop/secrets/aliases/default'
SESSION_COLLECTION = '/org/freedesktop/secrets/collection/session'


[docs]class Collection: """Represents a collection.""" def __init__(self, connection: DBusConnection, collection_path: str = DEFAULT_COLLECTION, session: Optional[Session] = None) -> None: self.connection = connection self.session = session self.collection_path = collection_path self._collection = DBusAddressWrapper( collection_path, COLLECTION_IFACE, connection) self._collection.get_property('Label')
[docs] def is_locked(self) -> bool: """Returns :const:`True` if item is locked, otherwise :const:`False`.""" return bool(self._collection.get_property('Locked'))
[docs] def ensure_not_locked(self) -> None: """If collection is locked, raises :exc:`~secretstorage.exceptions.LockedException`.""" if self.is_locked(): raise LockedException('Collection is locked!')
[docs] def unlock(self) -> bool: """Requests unlocking the collection. Returns a boolean representing whether the prompt has been dismissed; that means :const:`False` on successful unlocking and :const:`True` if it has been dismissed. .. versionchanged:: 3.0 No longer accepts the ``callback`` argument. """ return unlock_objects(self.connection, [self.collection_path])
[docs] def lock(self) -> None: """Locks the collection.""" service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, self.connection) service.call('Lock', 'ao', [self.collection_path])
[docs] def delete(self) -> None: """Deletes the collection and all items inside it.""" self.ensure_not_locked() prompt, = self._collection.call('Delete', '') if prompt != "/": dismissed, _result = exec_prompt(self.connection, prompt) if dismissed: raise PromptDismissedException('Prompt dismissed.')
[docs] def get_all_items(self) -> Iterator[Item]: """Returns a generator of all items in the collection.""" for item_path in self._collection.get_property('Items'): yield Item(self.connection, item_path, self.session)
[docs] def search_items(self, attributes: Dict[str, str]) -> Iterator[Item]: """Returns a generator of items with the given attributes. `attributes` should be a dictionary.""" result, = self._collection.call('SearchItems', 'a{ss}', attributes) for item_path in result: yield Item(self.connection, item_path, self.session)
[docs] def get_label(self) -> str: """Returns the collection label.""" label = self._collection.get_property('Label') assert isinstance(label, str) return label
[docs] def set_label(self, label: str) -> None: """Sets collection label to `label`.""" self.ensure_not_locked() self._collection.set_property('Label', 's', label)
[docs] def create_item(self, label: str, attributes: Dict[str, str], secret: bytes, replace: bool = False, content_type: str = 'text/plain') -> Item: """Creates a new :class:`~secretstorage.item.Item` with given `label` (unicode string), `attributes` (dictionary) and `secret` (bytestring). If `replace` is :const:`True`, replaces the existing item with the same attributes. If `content_type` is given, also sets the content type of the secret (``text/plain`` by default). Returns the created item.""" self.ensure_not_locked() if not self.session: self.session = open_session(self.connection) _secret = format_secret(self.session, secret, content_type) properties = { SS_PREFIX + 'Item.Label': ('s', label), SS_PREFIX + 'Item.Attributes': ('a{ss}', attributes), } item_path, prompt = self._collection.call( 'CreateItem', 'a{sv}(oayays)b', properties, _secret, replace ) if len(item_path) > 1: return Item(self.connection, item_path, self.session) dismissed, result = exec_prompt(self.connection, prompt) if dismissed: raise PromptDismissedException('Prompt dismissed.') signature, item_path = result assert signature == 'o' return Item(self.connection, item_path, self.session)
[docs]def create_collection(connection: DBusConnection, label: str, alias: str = '', session: Optional[Session] = None) -> Collection: """Creates a new :class:`Collection` with the given `label` and `alias` and returns it. This action requires prompting. :raises: :exc:`~secretstorage.exceptions.PromptDismissedException` if the prompt is dismissed. """ if not session: session = open_session(connection) properties = {SS_PREFIX + 'Collection.Label': ('s', label)} service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection) collection_path, prompt = service.call('CreateCollection', 'a{sv}s', properties, alias) if len(collection_path) > 1: return Collection(connection, collection_path, session=session) dismissed, result = exec_prompt(connection, prompt) if dismissed: raise PromptDismissedException('Prompt dismissed.') signature, collection_path = result assert signature == 'o' return Collection(connection, collection_path, session=session)
[docs]def get_all_collections(connection: DBusConnection) -> Iterator[Collection]: """Returns a generator of all available collections.""" service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection) for collection_path in service.get_property('Collections'): yield Collection(connection, collection_path)
[docs]def get_default_collection(connection: DBusConnection, session: Optional[Session] = None) -> Collection: """Returns the default collection. If it doesn't exist, creates it.""" try: return Collection(connection) except ItemNotFoundException: return create_collection(connection, 'Default', 'default', session)
[docs]def get_any_collection(connection: DBusConnection) -> Collection: """Returns any collection, in the following order of preference: - The default collection; - The "session" collection (usually temporary); - The first collection in the collections list.""" try: return Collection(connection) except ItemNotFoundException: pass try: # GNOME Keyring provides session collection where items # are stored in process memory. return Collection(connection, SESSION_COLLECTION) except ItemNotFoundException: pass collections = list(get_all_collections(connection)) if collections: return collections[0] else: raise ItemNotFoundException('No collections found.')
[docs]def get_collection_by_alias(connection: DBusConnection, alias: str) -> Collection: """Returns the collection with the given `alias`. If there is no such collection, raises :exc:`~secretstorage.exceptions.ItemNotFoundException`.""" service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection) collection_path, = service.call('ReadAlias', 's', alias) if len(collection_path) <= 1: raise ItemNotFoundException('No collection with such alias.') return Collection(connection, collection_path)
[docs]def search_items(connection: DBusConnection, attributes: Dict[str, str]) -> Iterator[Item]: """Returns a generator of items in all collections with the given attributes. `attributes` should be a dictionary.""" service = DBusAddressWrapper(SS_PATH, SERVICE_IFACE, connection) locked, unlocked = service.call('SearchItems', 'a{ss}', attributes) for item_path in locked + unlocked: yield Item(connection, item_path)