Source code for caproto.server.autosave

import datetime
import json
import logging
import operator
import pathlib
import tempfile
import typing

from . import PVGroup, pvproperty

MODULE_LOGGER = logging.getLogger(__name__)


[docs]class RotatingFileManager: """ Save to a primary filename, while rotating out old copies. Automatically prunes old copies of the file beyond a certain age or a maximum file count (overall or per day). Files will first be pruned by the overall age cutoff, then on a per day basis. Parameters ---------- filename : str The primary filename that should be used. max_files_per_day : int, optional Remove the oldest files in a given day. max_file_age : int, optional Prune files beyond this cutoff. date_suffix : str, optional Defaults to "_%Y%m%d-%H%M%S" - used by strftime/strptime. logger : logging.Logger, optional The logger instance to use for messages. Defaults to the module logger. """ date_suffix: str filename: pathlib.Path log: logging.Logger max_file_age: int max_files_per_day: int def __init__(self, filename, *, max_files_per_day=5, max_file_age=30, date_suffix="_%Y%m%d-%H%M%S", logger=MODULE_LOGGER): self.log = logger self.date_suffix = date_suffix self.filename = pathlib.Path(filename) self.max_files_per_day = max_files_per_day self.max_file_age = max_file_age self._date_format = ''.join((self.filename.stem, self.date_suffix, self.filename.suffix, )) self.files = self._get_file_dictionary() def _filename_to_date(self, fn: pathlib.Path ) -> typing.Optional[datetime.datetime]: """Using the date format, determine the date of a file.""" try: return datetime.datetime.strptime(fn.name, self._date_format) except ValueError: ... def _get_file_dictionary(self) -> typing.Dict[pathlib.Path, datetime.datetime]: """Get {filename: datetime} dictionary of existing files.""" files_and_dates = [ (fn, self._filename_to_date(fn)) for fn in self.directory.glob(self.glob_string) if fn != self.filename ] return { fn: dt for fn, dt in files_and_dates if dt is not None } @property def directory(self) -> pathlib.Path: """The directory holding the save file.""" return self.filename.parent @property def glob_string(self) -> str: """The glob string which can be used to find matching files.""" return f'{self.filename.stem}*{self.filename.suffix}' @property def files_by_age(self) -> typing.Dict[datetime.timedelta, pathlib.Path]: """Get a dictionary of {age_timedelta: filename}.""" now = datetime.datetime.now() return { (now - file_date).seconds: fn for fn, file_date in self.files.items() } def prune_files(self) -> set: """ Prune files that are outside of the configured maximum times. Returns ------- pruned : set The set of pruned files. """ files = self.files_by_age if not files: return set() seconds_per_day = 24 * 60 * 60 seconds_cutoff = self.max_file_age * seconds_per_day pruned = set() def get_next_to_remove(files) -> int: if not files: return # Remove really old files first, if beyond the cutoff: max_age = max(files) if max_age > seconds_cutoff: return max_age # Remove files from earlier today: today_files = {delta: fn for delta, fn in files.items() if delta < seconds_per_day} if len(today_files) > self.max_files_per_day: return max(today_files) while True: age = get_next_to_remove(files) if age is None: break fn = files.pop(age) self.files.pop(fn) self.log.info('Removing file %s (%s days old)', fn, age / seconds_per_day) try: fn.unlink() except Exception: self.log.exception('Failed to remove %s', fn) else: self.log.info('Removed %s', fn) pruned.add(fn) return pruned def rotate_in_file(self, fn: pathlib.Path) -> set: """ Move a valid file from another location to `self.filename` and prune. Returns ------- pruned : set The set of pruned files. """ if self.filename.exists(): now = datetime.datetime.now() rename_to = now.strftime(self._date_format) self.log.info('Renaming %s to %s', self.filename, rename_to) rename_to = pathlib.Path(self.filename.parent, rename_to) try: self.filename.rename(rename_to) except Exception: self.log.exception('Failed to rotate out file') else: self.files[rename_to] = now pathlib.Path(fn).rename(self.filename) return self.prune_files()
def _to_json_data(value): """Ensure any numpy values don't leak through.""" if hasattr(value, 'tolist'): # This works for scalars (e.g., np.uint32(5).tolist() = int(5)) # along with ndarrays) return value.tolist() return value def get_autosave_fields(pvprop, channeldata): """Get all autosaved fields from a pvproperty.""" for name in (pvprop.autosave.get('fields', None) or []): field = getattr(channeldata.field_inst, name, None) if field is not None: yield name, _to_json_data(field.value)
[docs]class AutosaveHelper(PVGroup): filename = 'autosave.json' period = 30 autosave_hook = pvproperty( read_only=True, name=':__autosave_hook__', doc='Internal hook which handles autosave functionality', ) def __init__(self, *args, file_manager: RotatingFileManager = None, **kwargs): super().__init__(*args, **kwargs) if file_manager is None: file_manager = RotatingFileManager(self.filename) self.file_manager = file_manager @autosave_hook.startup async def autosave_hook(self, instance, async_lib): """ A startup hook which lives until the IOC exits. Initially restores values from the autosave file `self.filename`, then periodically - at `self.period` seconds - saves autosave data to `self.filename`. """ await self.restore_from_file(self.filename) while True: await async_lib.library.sleep(self.period) await self.save() def find_autosave_properties(self): """Yield (pvprop, channeldata) for all tagged with `autosaved`.""" ioc = self.parent for dotted_attr, pvprop in ioc._pvs_.items(): if hasattr(pvprop, 'autosave'): channeldata = operator.attrgetter(dotted_attr)(ioc) yield pvprop, channeldata def prepare_data(self): """Generate the autosave dictionary.""" return { channeldata.pvname: { 'value': _to_json_data(channeldata.value), 'fields': dict(get_autosave_fields(pvprop, channeldata)) } for pvprop, channeldata in self.find_autosave_properties() } async def restore_from_file(self, filename): """Restore from the autosave file.""" filename = pathlib.Path(filename) if not filename.exists(): self.log.warning('Autosave file does not exist: %s', filename) return try: with open(filename, 'rt') as f: data = json.load(f) except Exception: self.log.exception('Failed to load JSON from %s', filename) return try: await self.restore_values(data) except Exception: self.log.exception('Failed to restore values from %s', filename) return async def restore_values(self, data): """Restore given the autosave file ``data`` dictionary.""" for pvname, info in data.items(): try: pvprop = self.parent.pvdb[pvname] except KeyError: self.log.error('Autosave pvname not in database: %s', pvname) continue try: await pvprop.write(info['value']) except Exception as ex: self.log.exception('Autosave restore failed: %s %s', pvname, ex) else: self.log.info('Restored %s => %s', pvname, pvprop.value) fields = info.get('fields', None) or {} for field_name, field_value in fields.items(): try: field = getattr(pvprop.field_inst, field_name) await field.write(field_value) except Exception as ex: self.log.exception( 'Autosave restore failed: %s field %s = %s %s', pvname, field_name, field_value, ex) else: self.log.info('Restored %s field %s => %s', pvname, field_name, field.value) async def save(self, data=None): """Save autosave ``data`` dictionary with the file manager.""" try: if data is None: data = self.prepare_data() with tempfile.NamedTemporaryFile(mode='wt', delete=False, dir=self.file_manager.directory, ) as stream: json.dump(data, stream) filename = stream.name except Exception: self.log.exception('Failed to save the current state') return try: self.file_manager.rotate_in_file(pathlib.Path(filename)) except Exception: self.log.exception('Rotating in save file failed')
def autosaved(pvprop, fields=None): """ `pvproperty` wrapper which tags it as something that should be autosaved. Parameters ---------- pvprop : pvproperty The pvproperty to wrap. fields : list, optional The list of fields to save, if using records. Returns ------- pvproperty The pvproperty passed in. Examples -------- :: value = autosaved(pvproperty(1.0)) """ if fields is None: fields = {'description'} pvprop.autosave = {'fields': fields} return pvprop