Source code for whisker.convenience

#!/usr/bin/env python

"""
whisker/convenience.py

===============================================================================

    Copyright © 2011-2020 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of the Whisker Python client library.

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

===============================================================================

**Convenience functions for simple Whisker clients.**

"""

import logging
from datetime import datetime
from tkinter import filedialog, Tk
import os
import sys
from typing import Any, Dict, Iterable, List, Optional, Type, Union

import arrow
from attrdict import AttrDict
import colorama
from colorama import Fore, Style
import dataset
# noinspection PyPackageRequirements
import yaml  # from pyyaml

from whisker.constants import FILENAME_SAFE_ISOFORMAT

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
colorama.init()

DEFAULT_PK_FIELD = "id"


[docs]def load_config_or_die(config_filename: str = None, mandatory: Iterable[Union[str, List[Any]]] = None, defaults: Dict[str, Any] = None, log_config: bool = False) -> AttrDict: """ Offers a GUI file prompt; loads a YAML config from it; or exits. Args: config_filename: Optional filename. If not supplied, a GUI interface will be used to ask the user. mandatory: List of mandatory items; if one of these is itself a list, that list is used as a hierarchy of attributes. defaults: A dict-like object of defaults. log_config: Report the config to the Python log? Returns: an class:`AttrDict` containing the config """ def _fail_mandatory(attrname_): errmsg = "Setting '{}' missing from config file".format(attrname_) log.critical(errmsg) sys.exit(1) mandatory = mandatory or [] # type: List[str] defaults = defaults or {} # type: Dict[str, Any] defaults = AttrDict(defaults) Tk().withdraw() # we don't want a full GUI; remove root window config_filename = config_filename or filedialog.askopenfilename( title='Open configuration file', filetypes=[('YAML files', '.yaml'), ('All files', '*.*')]) if not config_filename: log.critical("No config file given; exiting.") sys.exit(1) log.info("Loading config from: {}".format(config_filename)) with open(config_filename) as infile: config = AttrDict(yaml.safe_load(infile)) for attr in mandatory: if len(attr) > 1 and not isinstance(attr, str): # attr is a list of attributes, e.g. ['a', 'b', 'c'] for a.b.c obj = config so_far = [] for attrname in attr: so_far.append(attrname) if attrname not in obj: _fail_mandatory(".".join(so_far)) obj = obj[attrname] else: # attr is a string if attr not in config: _fail_mandatory(attr) config = defaults + config # use AttrDict to update if log_config: log.debug("config: {}".format(repr(config))) return config
[docs]def connect_to_db_using_attrdict(database_url: str, show_url: bool = False, engine_kwargs: Dict[str, Any] = None): """ Connects to a ``dataset`` database, and uses :class:`AttrDict` as the row type, so :class:`AttrDict` objects come back out again. """ if show_url: log.info("Connecting to database: {}".format(database_url)) else: log.info("Connecting to database") return dataset.connect(database_url, row_type=AttrDict, engine_kwargs=engine_kwargs)
# noinspection PyShadowingBuiltins
[docs]def ask_user(prompt: str, default: Any = None, type: Type[Any] = str, min: Any = None, max: Any = None, options: List[Any] = None, allow_none: bool = True) -> Any: """ Prompts the user via the command line, optionally with a default, range or set of options. Asks for user input. Coerces the return type. Args: prompt: prompt to display default: default value if the user just presses Enter type: type to coerce return value to (default ``str``) min: if not ``None``, enforce this minimum value max: if not ``None``, enforce this maximum value options: permitted options (if this is truthy and the user enters an option that's not among then, raise :exc:`ValueError`) allow_none: permit the return of ``None`` values (otherwise, if a ``None`` value would be returned, raise :exc:`ValueError`) Returns: user-supplied value """ options = options or [] defstr = "" minmaxstr = "" optionstr = "" if default is not None: type(default) # will raise if the user has passed a dumb default defstr = " [{}]".format(str(default)) if min is not None or max is not None: minmaxstr = " ({} to {})".format( min if min is not None else '–∞', max if max is not None else '+∞') if options: optionstr = " {{{}}}".format(", ".join(str(x) for x in options)) for o in options: type(o) # will raise if the user has passed a dumb option prompt = "{c}{p}{m}{o}{d}: {r}".format( c=Fore.YELLOW + Style.BRIGHT, p=prompt, m=minmaxstr, o=optionstr, d=defstr, r=Style.RESET_ALL, ) while True: try: str_answer = input(prompt) or default value = type(str_answer) if str_answer is not None else None if value is None and not allow_none: raise ValueError() if ((min is not None and value < min) or (max is not None and value > max)): raise ValueError() if options and value not in options: raise ValueError() return value except (TypeError, ValueError): print("Bad input value; try again.")
[docs]def save_data(tablename: str, results: List[Dict[str, Any]], taskname: str, timestamp: Union[arrow.Arrow, datetime] = None, output_format: str = "csv") -> None: """ Saves a ``dataset`` result set to a suitable output file. Args: tablename: table name, used only for creating the filename results: results to save taskname: task name, used only for creating the filename timestamp: timestamp, used only for creating the filename; if ``None``, the current date/time is used output_format: one of: ``"csv"``, ``"json"``, ``"tabson"`` (see https://dataset.readthedocs.org/en/latest/api.html#dataset.freeze) """ if timestamp is None: timestamp = datetime.utcnow() filename = "{taskname}_{datetime}_{tablename}.{output_format}".format( taskname=taskname, tablename=tablename, datetime=timestamp.strftime(FILENAME_SAFE_ISOFORMAT), output_format=output_format ) log.info("Saving {tablename} data to {filename}".format( tablename=tablename, filename=filename)) dataset.freeze(results, format=output_format, filename=filename) if not os.path.isfile(filename): log.error( "save_data: file {} not created; empty results?".format(filename))
[docs]def insert_and_set_id(table: dataset.Table, obj: Dict[str, Any], idfield: str = DEFAULT_PK_FIELD) -> Any: # but typically int # noqa """ Inserts an object into a :class:`dataset.Table` and ensures that the primary key (PK) is written back to the object, in its ``idfield``. (The :func:`dataset.Table.insert` command returns the primary key. However, it doesn't store that back, and we want users to do that consistently.) Args: table: the database table in which to insert obj: the dict-like record object to be added to the database idfield: the name of the primary key (PK) to be created within ``obj``, containing the record's PK in the database Returns: the primary key (typically integer) """ pk = table.insert(obj) obj[idfield] = pk return pk
[docs]def update_record(table: dataset.Table, obj: Dict[str, Any], newvalues: Dict[str, Any], idfield: str = DEFAULT_PK_FIELD) -> Optional[int]: """ Updates an existing record, using its primary key. Args: table: the database table to update obj: the dict-like record object to be updated newvalues: a dictionary of new values to write to ``obj`` and the database idfield: the name of the primary key (PK, ID) within ``obj``, used for the SQL ``WHERE`` clause to determine which record to update Returns: the number of rows updated, or None """ keys = [idfield] # for the WHERE clause obj.update(**newvalues) return table.update(obj, keys)