import datetime
import inspect
import linecache
import logging
import os
import pathlib
import platform
import sys
import threading
import tracemalloc
import typing
from typing import List, Optional
from .. import ChannelType, __version__
from . import PVGroup, SubGroup, pvproperty
from .autosave import autosaved
try:
import psutil
except ImportError:
psutil = None
try:
# This is part of the standard library, but I'm unsure if it's
# included on windows:
import resource
except ImportError:
resource = None
MODULE_LOGGER = logging.getLogger(__name__)
def _find_top_level_pvgroup(group: PVGroup) -> PVGroup:
ancestor = group
while ancestor.parent is not None:
ancestor = ancestor.parent
return ancestor
def get_source_file(obj: typing.Any) -> pathlib.Path:
"""Get the source filename of `obj` as a :class:`~pathlib.Path`."""
return pathlib.Path(inspect.getsourcefile(obj)).resolve()
[docs]class BasicStatusHelper(PVGroup):
_root_pvgroup: PVGroup
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._root_pvgroup = _find_top_level_pvgroup(self)
process_id = pvproperty(
value=0,
name='PROCESS_ID',
record='ai',
read_only=True,
)
parent_pid = pvproperty(
name='PARENT_ID',
record='ai',
doc="Parent Process ID",
read_only=True,
)
cpu_count = pvproperty(
value=0,
name='CPU_CNT',
record='longin',
read_only=True,
doc='Number of CPUs',
)
kernel_version = pvproperty(
value=str(platform.uname().version),
name='KERNEL_VERS',
record='stringin',
doc='OS/Kernel Version',
read_only=True,
)
version = pvproperty(
value=f'{__version__} on {sys.version}',
name='EPICS_VERSION',
record='stringin',
doc='EPICS (caproto) version',
read_only=True,
)
engineer = pvproperty(
value='',
max_length=40,
name='ENGINEER',
record='stringin',
read_only=True,
doc='Who is responsible for this abomination',
)
location = pvproperty(
value='',
max_length=40,
name='LOCATION',
record='stringin',
read_only=True,
)
hostname = pvproperty(
value='',
max_length=255,
name='HOSTNAME',
record='stringin',
read_only=True,
)
application_directory = pvproperty(
value='',
name='APP_DIR',
record='waveform',
max_length=255,
read_only=True,
doc='Startup directory (__main__)',
)
source_filename = pvproperty(
value='',
name='SOURCE_FILE',
record='waveform',
max_length=255,
read_only=True,
doc='Top-level PVGroup source filename',
)
sysreset = pvproperty(
name='SYSRESET',
record='sub',
doc='IOC exit / restart (if using procServ)',
)
@sysreset.putter
async def sysreset(self, instance, value):
if value == 1:
self.log.warning('Exit requested using StatusHelper!')
sys.exit(0)
@process_id.startup
async def process_id(self, instance, async_lib):
await self.process_id.write(value=os.getpid())
await self.parent_pid.write(value=os.getppid())
await self.location.write(value=os.environ.get('LOCATION', ''))
await self.engineer.write(value=os.environ.get('ENGINEER', ''))
try:
root_source = get_source_file(type(self._root_pvgroup))
await self.source_filename.write(value=str(root_source))
except Exception:
self.log.exception('Unable to determine source path')
try:
main_path = get_source_file(sys.modules['__main__'])
await self.application_directory.write(value=str(main_path.parent))
except TypeError:
# Built-in is OK
...
except Exception:
self.log.exception('Unable to determine startup directory')
if psutil is not None:
await self.cpu_count.write(value=psutil.cpu_count())
[docs]class PeriodicStatusHelper(PVGroup):
"""
An IocStats-like tool for caproto IOCs. Includes values which update
on a periodic basis (:attr:`update_period`, PV ``UPD_TIME``).
"""
_process: Optional['psutil.Process']
_root_pvgroup: PVGroup
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._root_pvgroup = _find_top_level_pvgroup(self)
self._process = None
if psutil is None:
self.log.warning(
"The Python library psutil is not installed, so MEM_USED will "
"not be reported by StatsHelper."
)
else:
self._process = psutil.Process()
access = pvproperty(
# Not implemented:
name='ACCESS',
doc='CA Security access level to this IOC',
enum_strings=['Running', 'Maintenance', 'Test', 'OFFLINE'],
record='mbbo',
value='Running',
dtype=ChannelType.ENUM,
)
start_tod = pvproperty(
value='',
max_length=100,
name='STARTTOD',
record='stringin',
doc="Time and date of startup",
read_only=True,
)
@start_tod.startup
async def start_tod(self, instance, async_lib):
await self.start_tod.write(value=str(datetime.datetime.now()))
time_of_day = pvproperty(
value='',
name='TOD',
max_length=100,
record='stringin',
doc="Current time and date",
read_only=True
)
@time_of_day.scan(period=1)
async def time_of_day(self, instance, async_lib):
"""Update time-of-day."""
await self.time_of_day.write(value=str(datetime.datetime.now()))
heartbeat = pvproperty(
value=0,
name='HEARTBEAT',
record='calcout',
doc='1 Hz counter since startup',
read_only=True,
)
@heartbeat.scan(period=1)
async def heartbeat(self, instance, async_lib):
next_beat = (self.heartbeat.value + 1) % (2 ** 31)
await self.heartbeat.write(value=next_beat)
start_count = pvproperty(
value=0,
name='START_CNT',
record='calcout',
doc='Startup count, if autosave is working',
read_only=True,
)
autosaved(start_count)
@start_count.startup
async def start_count(self, instance, async_lib):
# Give autosave some time to load
await async_lib.library.sleep(3)
await self.start_count.write(value=self.start_count.value + 1)
# TODO: any way we can find this information?
ca_client_count = pvproperty(
value=0,
name='CA_CLNT_CNT',
record='longin',
read_only=True,
doc='Number of CA clients [not implemented]',
)
ca_connection_count = pvproperty(
value=0,
name='CA_CONN_CNT',
record='longin',
read_only=True,
doc='Number of CA Connections [not implemented]',
)
record_count = pvproperty(
name='RECORD_CNT',
record='ai',
read_only=True,
doc='Number of records',
)
fd_max = pvproperty(
value=0,
name='FD_MAX',
record='ai',
doc='Max File Descriptors',
read_only=True,
)
fd_count = pvproperty(
value=0,
doc='Allocated File Descriptors',
name='FD_CNT',
record='ai',
read_only=True,
)
fd_free = pvproperty(
name='FD_FREE',
record='calc',
read_only=True,
doc='Available FDs',
)
sys_cpu_load = pvproperty(
value=0.0,
name='SYS_CPU_LOAD',
record='ai',
lower_ctrl_limit=0.0,
upper_ctrl_limit=100.0,
units='%',
read_only=True,
doc='CPU load',
)
ioc_cpu_load = pvproperty(
value=0.0,
name='IOC_CPU_LOAD',
record='ai',
lower_ctrl_limit=0.0,
upper_ctrl_limit=100.0,
units='%',
read_only=True,
)
susp_task_count = pvproperty(
value=0,
name='SUSP_TASK_CNT',
record='longin',
read_only=True,
doc='Number of Suspended Tasks [not implemented]',
)
mem_used = pvproperty(
value=0,
name='MEM_USED',
record='ai',
units='KByte',
read_only=True,
doc='Memory used.',
)
mem_free = pvproperty(
value=0,
name='MEM_FREE',
record='ai',
units='KByte',
read_only=True,
doc='Memory free (including swap).',
)
mem_max = pvproperty(
value=0,
name='MEM_MAX',
record='ai',
units='KByte',
read_only=True,
)
uptime = pvproperty(
value=0,
name='UPTIME',
record='longin',
units='s',
doc='Elapsed time since start',
read_only=True,
)
num_threads = pvproperty(
value=0,
name='NumThreads',
record='longin',
read_only=True,
doc='Number of threads in use',
)
update_period = pvproperty(
value=15.0,
name='UPD_TIME',
record='ao',
lower_ctrl_limit=1,
upper_ctrl_limit=60,
doc='Basic stats update rate',
)
async def _update_psutil_status(self):
"""Update process information, if psutil is available."""
if self._process is None:
return
process = typing.cast(psutil.Process, self._process)
# Memory usage
memory_info = process.memory_info()
await self.mem_used.write(value=memory_info.rss // 1024)
# Memory available
vmem = psutil.virtual_memory()
swap = psutil.swap_memory()
await self.mem_free.write(value=(vmem.available + swap.free) // 1024)
await self.mem_max.write(value=(vmem.total + swap.total) // 1024)
# CPU usage
await self.ioc_cpu_load.write(value=process.cpu_percent())
await self.sys_cpu_load.write(value=psutil.cpu_percent())
# File descriptor information:
await self.fd_count.write(value=process.num_fds())
await self.fd_free.write(value=self.fd_max.value - self.fd_count.value)
async def _update(self):
"""Periodic updates happen here."""
await self.record_count.write(value=len(self._root_pvgroup.pvdb))
await self._update_psutil_status()
await self.num_threads.write(value=threading.active_count())
# Uptime since our startup method was first called:
elapsed = datetime.datetime.now() - self._startup_time
await self.uptime.write(value=elapsed.total_seconds())
@update_period.startup
async def update_period(self, instance, async_lib):
self._startup_time = datetime.datetime.now()
if resource is not None:
try:
soft_limit, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
await self.fd_max.write(value=soft_limit)
except Exception:
self.log.warning('Failed to get maximum file descriptors')
while True:
try:
await self._update()
except Exception as ex:
self.log.warning('Status update failure: %s', ex)
await async_lib.library.sleep(self.update_period.value)
TRACEMALLOC_FILTERS = (
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
tracemalloc.Filter(False, tracemalloc.__file__),
)
def get_top_allocation_info(snapshot: tracemalloc.Snapshot, *,
key_type: str = 'lineno',
cumulative: bool = False,
limit: int = 20,
filters: Optional[List[tracemalloc.Filter]] = None,
) -> List[str]:
"""
Get the top allocations from a given tracemalloc snapshot in a list.
Parameters
----------
snapshot : tracemalloc.Snapshot
Snapshot to get information from.
key_type : str, optional
Key for the snapshot statistics.
cumulative : bool, optional
Cumulative statistics.
limit : int, optional
Limit the number of results.
filters : list of tracemalloc.Filter
Filters to apply to the snapshot.
Returns
-------
text_lines : list of str
"""
if filters is None:
filters = TRACEMALLOC_FILTERS
snapshot = snapshot.filter_traces(filters)
top_stats = snapshot.statistics(key_type, cumulative=cumulative)
result = []
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
kbytes = stat.size / 1024
result.append(
f"#{index}: {frame.filename}:{frame.lineno}: {kbytes:.1f} KiB"
)
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
result.append(f' {line}')
return result, top_stats
[docs]class MemoryTracingHelper(PVGroup):
"""
A helper which quickly allows for tracing memory usage and allocations on a
caproto server instance.
Parameters
----------
prefix : str
Prefix for all PVs in the group
macros : dict, optional
Dictionary of macro name to value
parent : PVGroup, optional
Parent PVGroup
name : str, optional
Name for the group, defaults to the class name
states : dict, optional
A dictionary of states used for channel filtering. See
https://epics.anl.gov/base/R3-15/5-docs/filters.html
filters : list of tracemalloc.Filter, optional
Filters to apply to the snapshot. Defaults to TRACEMALLOC_FILTERS.
"""
_old_snapshot: tracemalloc.Snapshot
filters: List[tracemalloc.Filter]
def __init__(self, *args,
filters: Optional[List[tracemalloc.Filter]] = None,
**kwargs):
super().__init__(*args, **kwargs)
self._old_snapshot = None
if filters is None:
filters = TRACEMALLOC_FILTERS
self.filters = filters
enable_tracing = pvproperty(
name='EnableTracing',
value='Disable',
doc='Enable/disable in-depth memory analysis',
record='bo',
enum_strings=['Disable', 'Enable'],
dtype=ChannelType.ENUM,
)
diff_results = pvproperty(
name='TraceDiffResults',
value='Unset',
max_length=20000,
read_only=True,
doc='Trace diff from snapshot to snapshot',
record='waveform',
)
top_allocations = pvproperty(
name='TraceTopAllocations',
value='Unset',
max_length=20000,
read_only=True,
doc='Top allocations in snapshot',
record='waveform',
)
trace_count = pvproperty(
name='TraceCount',
value=10,
read_only=False,
doc='Number of top allocations to view',
record='ao',
)
@enable_tracing.scan(period=10)
async def enable_tracing(self, instance, async_lib):
if self.enable_tracing.value == 'Disable':
if tracemalloc.is_tracing():
self._old_snapshot = None
tracemalloc.stop()
return
if not tracemalloc.is_tracing():
tracemalloc.start()
snapshot = tracemalloc.take_snapshot()
count = self.trace_count.value
top_lines, stats = get_top_allocation_info(snapshot, limit=count)
await self.top_allocations.write(
value='\n'.join(top_lines)[:self.top_allocations.max_length]
)
if self._old_snapshot is not None:
comparison = snapshot.filter_traces(
self.filters
).compare_to(self._old_snapshot, 'lineno')
self.log.debug('** %s **', datetime.datetime.now())
for stat in comparison[:count]:
self.log.debug(stat)
status = '\n'.join(str(stat) for stat in comparison[:count])
await self.diff_results.write(
value=status[:self.diff_results.max_length]
)
self._old_snapshot = snapshot
[docs]class StatusHelper(PVGroup):
"""
An IocStats-like tool for caproto IOCs.
Includes all PVs from :class:`BasicStatusHelper` and
:class:`PeriodicStatusHelper`.
"""
basic = SubGroup(BasicStatusHelper, prefix='')
periodic = SubGroup(PeriodicStatusHelper, prefix='')