#!/usr/bin/env python
"""Connect to Darshan logs.
This connector provides an interface into Darshan logs created by Darshan 3.0 or
higher and represents the counters and data contained therein as a Python
dictionary. This dictionary has the following structure, where ``block``
denote literal key names.
* ``header`` which contains key-value pairs corresponding to each line in the
header. ``exe`` and ``metadata`` are lists; the other keys correspond to a
single scalar value.
* ``compression``, ``end_time``, ``end_time_string``, ``exe``, etc
* ``counters``
* `modulename` which is ``posix``, ``lustre``, ``stdio``, etc
* `recordname`, which is usually the full path to a file opened by the
profiled application _or_ ``_perf`` (contains performance summary metrics)
or ``_total`` (contains aggregate file statistics)
* ranknum which is a string (``0``, ``1``, etc or ``-1``)
* `counternames`, which depends on the Darshan module defined by
`modulename` above
* ``mounts`` which is the mount table with keys of a path to a mount location
and values of the file system type
The `counternames` are module-specific and have their module name prefix
stripped off. The following counter names are examples of what a Darshan log
may expose through this connector for the ``posix`` module:
* ``BYTES_READ`` and ``BYTES_WRITTEN`` - number of bytes read/written to the file
* ``MAX_BYTE_WRITTEN`` and ``MAX_BYTE_READ`` - highest byte written/read; useful if an application re-reads or re-writes a lot of data
* ``WRITES`` and ``READS`` - number of write and read ops issued
* ``F_WRITE_TIME`` and ``F_READ_TIME`` - amount of time spent inside write and read calls (in seconds)
* ``F_META_TIME`` - amount of time spent in metadata (i.e., non-read/write) calls
Similarly the ``lustre`` module provides the following counter keys:
* ``MDTS`` - number of MDTs in the underlying file system
* ``OSTS`` - number of OSTs in the underlying file system
* ``OST_ID_0`` - the OBD index for the 0th OST over which the file is striped
* ``STRIPE_OFFSET`` - the setting used to define stripe offset when the file was created
* ``STRIPE_SIZE`` - the size, in bytes, of each stripe
* ``STRIPE_WIDTH`` - how many OSTs the file touches
Note:
This connector presently relies on ``darshan-parser`` to convert the binary
logs to ASCII, then convert the ASCII into Python objects. In the future,
we plan on using the Python API provided by darshan-utils to circumvent the
ASCII translation.
"""
import os
import re
import json
import errno
import subprocess
import warnings
from .common import SubprocessOutputDict
from ..common import isstr
DARSHAN_PARSER_BIN = 'darshan-parser'
DARSHAN_FILENAME_REX = re.compile(r'([^_%s]+)_([^%s]*?)_id(\d+)_(\d+)-(\d+)-(\d+)-(\d+)_(\d+).darshan' % (os.path.sep, os.path.sep))
[docs]class Darshan(SubprocessOutputDict):
[docs] def __init__(self, log_file=None, *args, **kwargs):
"""Initialize the object from either a Darshan log or a cache file.
Configures the object's internal state to operate on a Darshan
log file or a cached JSON representation of a previously processed
Darshan log.
Args:
log_file (str, optional): Path to a Darshan log to be processed
cache_file (str, optional): Path to a Darshan log's contents cached
*args: Passed to tokio.connectors.common.SubprocessOutputDict
*kwargs: Passed to tokio.connectors.common.SubprocessOutputDict
Attributes:
log_file (str): Path to the Darshan log file to load
"""
super(Darshan, self).__init__(*args, **kwargs)
self.log_file = log_file
self._parser_mode = None
self._only_modules = None
self._only_counters = None
self.subprocess_cmd = [DARSHAN_PARSER_BIN]
self.filename_metadata = {}
if log_file is None:
self.load()
else:
self.filename_metadata = parse_filename_metadata(log_file)
[docs] def __repr__(self):
"""Serialize self into JSON.
Returns:
str: JSON representation of the object
"""
return json.dumps(list(self.values()))
[docs] def load(self):
if self.from_string is not None:
self.load_str(self.from_string)
elif self.cache_file:
self.load_cache()
elif self.log_file is None:
raise Exception("parameters should be provided (at least log_file or cache_file)")
[docs] def darshan_parser_base(self, modules=None, counters=None):
"""Populate data produced by ``darshan-parser --base``
Runs the ``darshan-parser --base`` and convert all results into
key-value pairs which are inserted into the object.
Args:
modules (list of str): If specified, only return data from the given
Darshan modules
counters (list of str): If specified, only return data for the
given counters
Returns:
dict: Dictionary containing all key-value pairs generated by running
``darshan-parser --base``. These values are also accessible via the
`BASE` key in the object.
"""
self._parser_mode = "BASE"
self._only_modules = set(modules) if modules else None
self._only_counters = set(counters) if counters else None
return self._darshan_parser()
[docs] def darshan_parser_total(self, modules=None, counters=None):
"""Populate data produced by ``darshan-parser --total``
Runs the ``darshan-parser --total`` and convert all results into
key-value pairs which are inserted into the object.
Args:
modules (list of str): If specified, only return data from the given
Darshan modules
counters (list of str): If specified, only return data for the
given counters
Returns:
dict: Dictionary containing all key-value pairs generated by running
``darshan-parser --total``. These values are also accessible via
the `TOTAL` key in the object.
"""
self._parser_mode = "TOTAL"
self._only_modules = set() if not modules else set(modules)
self._only_counters = set() if not counters else set(counters)
return self._darshan_parser()
[docs] def darshan_parser_perf(self, modules=None, counters=None):
"""Populate data produced by ``darshan-parser --perf``
Runs the ``darshan-parser --perf`` and convert all results into
key-value pairs which are inserted into the object.
Args:
modules (list of str): If specified, only return data from the given
Darshan modules
counters (list of str): If specified, only return data for the
given counters
Returns:
dict: Dictionary containing all key-value pairs generated by running
``darshan-parser --perf``. These values are also accessible via the
`PERF` key in the object.
"""
self._parser_mode = "PERF"
self._only_modules = set() if not modules else set(modules)
self._only_counters = set() if not counters else set(counters)
return self._darshan_parser()
[docs] def _darshan_parser(self):
"""Call darshan-parser to initialize values in self
"""
if self.log_file is None:
return self
if self._parser_mode in ["BASE", "TOTAL", "PERF"]:
darshan_flag = "--" + self._parser_mode.lower()
else:
self._parser_mode = "BASE"
darshan_flag = ""
args = [darshan_flag, self.log_file]
# this loads the entire stdout into memory at once. is a problem for
# Darshan logs that expand to tens of gigabytes of ascii
#
# self._load_subprocess(*args)
# this loads stdout line-by-line from a pipe and is memory-efficient
# but may suffer from weird buffering effects on some platforms
#
self._load_subprocess_iter(*args)
return self
[docs] def _load_subprocess_iter(self, *args):
"""Run a subprocess and pass its stdout to a self-initializing parser
"""
cmd = self.subprocess_cmd
if args:
cmd += args
try:
if self.silent_errors:
dparser = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
else:
dparser = subprocess.Popen(cmd, stdout=subprocess.PIPE)
except OSError as error:
if error.errno == errno.ENOENT:
raise type(error)(error.errno, "%s command not found" % self.subprocess_cmd[0])
raise
# Python 2 - subprocess.check_output returns a string
self.load_str(iter(dparser.stdout.readline, ''))
dparser.stdout.close()
dparser.wait()
retcode = dparser.returncode
if retcode != 0:
warnings.warn("%s returned nonzero exit code (%d)" % (cmd, retcode))
[docs] def load_str(self, input_str):
"""Load from either a json cache or the output of darshan-parser
Args:
input_str: Either (1) stdout of the darshan-parser command as a
string, (2) the json-encoded representation of a Darshan
object that can be deserialized to initialize self, or (3)
an iterator that produces the output of darshan-parser
line-by-line
"""
if isstr(input_str):
loaded_data = None
try:
loaded_data = json.loads(input_str)
except ValueError:
pass
if loaded_data:
# if we could successfully load json, store it in self
self.__setitem__(loaded_data)
else:
# otherwise, treat input_str as the raw stdout of darshan-parser
self._parse_darshan_parser(input_str.splitlines())
else:
# treat input_str as an iterator that will produce lines of
# darshan-parser
self._parse_darshan_parser(input_str)
[docs] def _parse_darshan_parser(self, lines):
"""Load values from output of darshan-parser
Args:
lines: Any iterable that produces lines of darshan-parser output
"""
def is_valid_counter(counter):
"""
if counter is not None, this line is valid (return True)
if counter is None but we can identify a module section, return it
if counter is None but we cannot identify a module section, return False
"""
if counter is None:
match = module_rex.search(line)
if match is not None:
module_section = match.group(1)
module_section = module_section.replace('-', '') # because of "MPI-IO" and "MPIIO"
module_section = module_section.replace('/', '') # because of "BG/Q" and "BGQ"
return False, module_section
return False, None
return True, None
def insert_record(section, module, file_name, rank, counter, value, counter_prefix=None):
"""
Embed a counter=value pair deep within the darshan_data structure based
on a bunch of nested keys.
"""
# Force the local shadow of 'module' to lowercase
module = module.lower()
# Assert that the counter actually belongs to the current module
if counter_prefix is not None:
if counter.startswith(counter_prefix):
# Strip off the counter_prefix from the counter name
counter = counter[len(counter_prefix):]
if (self._only_modules and module not in self._only_modules) \
or (self._only_counters and counter not in self._only_counters):
return # don't insert
# Otherwise insert the record--this logic should be made more flexible
if section not in self:
self[section] = {}
if module not in self[section]:
self[section][module] = {}
if file_name not in self[section][module]:
self[section][module][file_name] = {}
if rank is None:
insert_base = self[section][module][file_name]
else:
if rank not in self[section][module][file_name]:
self[section][module][file_name][rank] = {}
insert_base = self[section][module][file_name][rank]
if counter in insert_base:
raise Exception("Duplicate counter %s found in %s->%s->%s (rank=%s)" % (counter, section, module, file_name, rank))
if '.' in value:
value = float(value)
else:
value = int(value)
insert_base[counter] = value
section = None
counter = None
counter_prefix = None
module_section = None
# This regex must match every possible module name
module_rex = re.compile(r'^# ([A-Z\-0-9/]+) module data\s*$')
for line in lines:
if not line:
break
if not isstr(line):
# Python 3 - subprocess.check_output returns encoded bytes
line = line.decode()
# Is this the start of a new section?
# Why do we look at section, refactorize failed
if section is None and line.startswith("# darshan log version:"):
section = "header"
if section not in list(self.keys()):
self[section] = {}
elif section == "header" and line.startswith("# mounted file systems"):
section = "mounts"
if section not in list(self.keys()):
self[section] = {}
elif section == "mounts" and line.startswith("# **********************"): # understand the utility of these stars
section = "counters"
if section not in list(self.keys()):
self[section] = {}
# otherwise use the appropriate parser for this section
if section == "header":
key, val = parse_header(line)
if key is None:
pass
elif key == "metadata":
if key not in self[section]:
self[section][key] = []
self[section][key].append(val)
else:
self[section][key] = val
elif section == 'mounts':
key, val = parse_mounts(line)
if key is not None:
self[section][key] = val
elif section == 'counters':
if self._parser_mode == "BASE":
# module, rank, record_id, counter, value, file_name, mount_pt, fs_type = parse_base_counters(line)
_, rank, _, counter, value, file_name, _, _ = parse_base_counters(line)
if module_section is not None:
# If it is none, is_valid_counter check below will bail
counter_prefix = module_section + "_"
elif self._parser_mode == "TOTAL":
counter, value = parse_total_counters(line)
file_name = '_total'
rank = None
if module_section is not None:
counter_prefix = 'total_%s_' % module_section
elif self._parser_mode == "PERF":
counter, value = parse_perf_counters(line)
file_name = '_perf'
rank = None
counter_prefix = None
else:
continue
# If no valid counter found, is this the start of a new module?
valid, new_module_section = is_valid_counter(counter)
if new_module_section is not None:
module_section = new_module_section
if valid is False:
continue
# Reminder: section is already defined as the global parser state
insert_record(section=section,
module=module_section,
file_name=file_name,
rank=rank,
counter=counter,
value=value,
counter_prefix=counter_prefix)
return self
[docs]def parse_mounts(line):
"""Parse a mount table line from ``darshan-parser``.
Accepts a line that may or may not be a mount table entry from
``darshan-parser``. Such lines take the form::
# mount entry: /usr/lib64/libibverbs.so.1.0.0 dvs
If `line` is a valid mount table entry, return a key-value
representation of its contents.
Args:
line (str): A single line of output from ``darshan-parser``
Returns:
tuple: Returns a (key, value) corresponding to the mount table
entry, or ``(None, None)`` if the line is not a valid mount
table entry.
"""
if line.startswith("# mount entry:"):
key, val = line.split('\t')[1:3]
return key, val.strip()
return None, None
[docs]def parse_base_counters(line):
"""Parse a counter line from ``darshan-parser --base``.
Parse the line containing an actual counter's data. It is a tab-delimited
line of the form
module, rank, record_id, counter, value, file_name, mount_pt, fs_type
Args:
line (str): A single line of output from ``darshan-parser --base``
Returns:
tuple: Returns a tuple containing eight values. If `line` is not a
valid counter line, all values will be None. The returned values are::
0. module name
1. MPI rank
2. record id
3. counter name
4. counter value
5. file name
6. mount point
7. file system type
"""
if not line.startswith("#"):
args = line.split('\t')
if len(args) == 8:
return tuple(args)
return None, None, None, None, None, None, None, None
[docs]def parse_total_counters(line):
"""Parse a counter line from ``darshan-parser --total``.
Parse a line containing counter data from ``darshan-parser --total``.
Such lines are of the form:
total_MPIIO_F_READ_END_TIMESTAMP: 0.000000
Args:
line (str): A single line of output from ``darshan-parser --total``
Returns:
tuple: Returns a single (key, value) pair corresponding to a
counted metric and its total value. If `line` is not a valid
counter line, ``(None, None)`` are returned.
"""
if not line.startswith("#"):
args = line.split(':')
if len(args) == 2:
return args[0].strip(), args[1].strip()
return None, None
[docs]def parse_perf_counters(line):
"""Parse a counter line from ``darshan-parser --perf``.
Parse a line containing counter data from ``darshan-parser --perf``.
Such lines look like::
# total_bytes: 2199023259968
# unique files: slowest_rank_io_time: 0.000000
# shared files: time_by_cumul_io_only: 39.992327
# agg_perf_by_slowest: 28670.996545
Args:
line (str): A single line of output from ``darshan-parser --perf``
Returns:
tuple: Returns a single (key, value) pair corresponding to the
performance metric encoded in `line`. If `line` is not a valid
performance counter line, ``(None, None)`` is returned.
"""
if line.startswith('# total_bytes:') or line.startswith('# agg_perf_by'):
key, value = line[2:].split(':')
elif line.startswith('# unique files:') or line.startswith('# shared files:'):
key_suffix, key, value = line[2:].split(':')
key += '_%s' % key_suffix.replace(' ', '_')
else:
return None, None
return key.strip(), value.strip()