Source code for tokio.connectors.lmtdb

#!/usr/bin/env python
"""
Interface with an LMT database.  Provides wrappers for common queries using the
CachingDb class.
"""

import os
import datetime
from . import cachingdb

### Names and schemata of all LMT database tables worth caching
LMTDB_TABLES = {
    "FILESYSTEM_INFO": {
        'columns': [
            'FILESYSTEM_ID',
            'FILESYSTEM_NAME',
            'FILESYSTEM_MOUNT_NAME,'
            'SCHEMA_VERSION',
        ],
        'primary_key': ['FILESYSTEM_ID'],
    },
    "MDS_DATA": {
        'columns': [
            'MDS_ID',
            'TS_ID',
            'PCT_CPU',
            'KBYTES_FREE',
            'KBYTES_USED',
            'INODES_FREE',
            'INODES_USED',
        ],
        'primary_key': ['MDS_ID', 'TS_ID'],
    },
    "MDS_INFO": {
        'columns': [
            'MDS_ID',
            'FILESYSTEM_ID',
            'MDS_NAME',
            'HOSTNAME',
            'DEVICE_NAME'
        ],
        'primary_key': ['MDS_ID'],
    },
    "MDS_OPS_DATA": {
        'columns': [
            'MDS_ID',
            'TS_ID',
            'OPERATION_ID',
            'SAMPLES',
            'SUM',
            'SUMSQUARES',
        ],
        'primary_key': ['MDS_ID', 'TS_ID', 'OPERATION_ID'],
    },
    "MDS_VARIABLE_INFO": {
        'columns': [
            'VARIABLE_ID',
            'VARIABLE_NAME',
            'VARIABLE_LABEL',
            'THRESH_TYPE',
            'THRESH_VAL1',
            'THRESH_VAL2'
        ],
        'primary_key': ['VARIABLE_ID'],
    },
    "OPERATION_INFO": {
        'columns': [
            'OPERATION_ID',
            'OPERATION_NAME',
            'UNITS'
        ],
        'primary_key': ['OPERATION_ID'],
    },
    "OSS_DATA": {
        'columns': [
            'OSS_ID',
            'TS_ID',
            'PCT_CPU',
            'PCT_MEMORY'
        ],
        'primary_key': ['OSS_ID', 'TS_ID'],
    },
    "OSS_INFO": {
        'columns': [
            'OSS_ID',
            'FILESYSTEM_ID',
            'HOSTNAME',
            'FAILOVERHOST'
        ],
        'primary_key': [
            'OSS_ID',
            'HOSTNAME'
        ],
    },
    "OST_DATA": {
        'columns': [
            'OST_ID',
            'TS_ID',
            'READ_BYTES',
            'WRITE_BYTES',
            'PCT_CPU',
            'KBYTES_FREE',
            'KBYTES_USED',
            'INODES_FREE',
            'INODES_USED'
        ],
        'primary_key': ['OST_ID', 'TS_ID'],
    },
    "OST_INFO": {
        'columns': [
            'OST_ID',
            'OSS_ID',
            'OST_NAME',
            'HOSTNAME',
            'OFFLINE',
            'DEVICE_NAME'
        ],
        'primary_key': ['OST_ID'],
    },
    "OST_VARIABLE_INFO": {
        'columns': [
            'VARIABLE_ID',
            'VARIABLE_NAME',
            'VARIABLE_LABEL',
            'THRESH_TYPE',
            'THRESH_VAL1',
            'THRESH_VAL2'
        ],
        'primary_key': ['VARIABLE_ID'],
    },
    "TIMESTAMP_INFO": {
        'columns': ['TS_ID', 'TIMESTAMP'],
        'primary_key': ['TS_ID'],
    },
}

[docs]class LmtDb(cachingdb.CachingDb): """ Class to wrap the connection to an LMT MySQL database or SQLite database """
[docs] def __init__(self, dbhost=None, dbuser=None, dbpassword=None, dbname=None, cache_file=None): """ Initialize LmtDb with either a MySQL or SQLite backend """ # Get database parameters if dbhost is None: dbhost = os.environ.get('PYTOKIO_LMT_HOST') if dbuser is None: dbuser = os.environ.get('PYTOKIO_LMT_USER') if dbpassword is None: dbpassword = os.environ.get('PYTOKIO_LMT_PASSWORD') if dbname is None: dbname = os.environ.get('PYTOKIO_LMT_DB') super(LmtDb, self).__init__( dbhost=dbhost, dbuser=dbuser, dbpassword=dbpassword, dbname=dbname, cache_file=cache_file) # The list of OST names is an immutable property of a database, so # fetch and cache it here. Also maintain a mapping of OST_ID to # OST_NAME so we don't have to do JOINs in SQL; this will not scale # to gargantuan Lustre clusters though. self.ost_names = [] self.ost_id_map = {} for row in self.query('SELECT OST_ID, OST_NAME FROM OST_INFO'): self.ost_names.append(row[1]) self.ost_id_map[row[0]] = row[1] self.ost_names = tuple(self.ost_names) # Do the same for OSSes self.oss_names = [] self.oss_id_map = {} for row in self.query('SELECT OSS_ID, HOSTNAME FROM OSS_INFO'): self.oss_names.append(row[1]) self.oss_id_map[row[0]] = row[1] self.oss_names = tuple(self.oss_names) # Do the same for MDSes self.mds_names = [] self.mds_id_map = {} for row in self.query('SELECT MDS_ID, HOSTNAME FROM MDS_INFO'): self.mds_names.append(row[1]) self.mds_id_map[row[0]] = row[1] self.mds_names = tuple(self.mds_names) # Do the same for MDS operations self.mds_op_names = [] self.mds_op_id_map = {} for row in self.query('SELECT OPERATION_ID, OPERATION_NAME FROM OPERATION_INFO'): self.mds_op_names.append(row[1]) self.mds_op_id_map[row[0]] = row[1] self.mds_op_names = tuple(self.mds_op_names)
[docs] def get_ts_ids(self, datetime_start, datetime_end): """ Given a starting and ending time, return the lowest and highest ts_id values that encompass those dates, inclusive. """ # First figure out the timestamp range query_str = "SELECT TIMESTAMP_INFO.TS_ID FROM TIMESTAMP_INFO WHERE TIMESTAMP >= %(ps)s AND TIMESTAMP <= %(ps)s" query_variables = ( datetime_start.strftime("%Y-%m-%d %H:%M:%S"), datetime_end.strftime("%Y-%m-%d %H:%M:%S")) result = self.query(query_str=query_str, query_variables=query_variables) ts_ids = [x[0] for x in result] ### TODO: make sure this works (it's templated down in test_bin_cache_lmtdb.py) return min(ts_ids), max(ts_ids)
[docs] def get_timeseries_data(self, table, datetime_start, datetime_end, timechunk=datetime.timedelta(hours=1)): """ Break a timeseries query into smaller queries over smaller time ranges. This is an optimization to avoid the O(N*M) scaling of the JOINs in the underlying SQL query. """ table_schema = LMTDB_TABLES.get(table.upper()) if table_schema is None: raise KeyError("Table '%s' is not valid" % table) else: result_columns = ['TIMESTAMP'] + table_schema['columns'] format_dict = { 'schema': ', '.join(result_columns).replace("TS_ID,", "TIMESTAMP_INFO.TS_ID,"), 'table': table, } index0 = len(self.saved_results.get(table, {'rows': []})['rows']) chunk_start = datetime_start while chunk_start < datetime_end: if timechunk is None: chunk_end = datetime_end else: chunk_end = chunk_start + timechunk if chunk_end > datetime_end: chunk_end = datetime_end start_stamp = chunk_start.strftime("%Y-%m-%d %H:%M:%S") end_stamp = chunk_end.strftime("%Y-%m-%d %H:%M:%S") query_str = """SELECT %(schema)s FROM %(table)s INNER JOIN TIMESTAMP_INFO ON TIMESTAMP_INFO.TS_ID = %(table)s.TS_ID WHERE TIMESTAMP_INFO.TIMESTAMP >= %%(ps)s AND TIMESTAMP_INFO.TIMESTAMP < %%(ps)s """ % format_dict self.query(query_str, (start_stamp, end_stamp), table=table, table_schema=table_schema) if timechunk is not None: chunk_start += timechunk return self.saved_results[table]['rows'][index0:], result_columns
[docs] def get_mds_data(self, datetime_start, datetime_end, timechunk=datetime.timedelta(hours=1)): """Schema-agnostic method for retrieving MDS load data. Wraps get_timeseries_data() but fills in the exact table name used in the LMT database schema. Args: datetime_start (datetime.datetime): lower bound on time series data to retrieve, inclusive datetime_End (datetime.datetime): upper bound on time series data to retrieve, exclusive timechunk (datetime.timedelta): divide time range query into sub-ranges of this width to work around N*N scaling of JOINs Returns: Tuple of (results, column names) where results are tuples of tuples as returned by the MySQL query and column names are the names of each column expressed in the individual rows of results. """ return self.get_timeseries_data('MDS_DATA', datetime_start, datetime_end, timechunk=timechunk)
[docs] def get_mds_ops_data(self, datetime_start, datetime_end, timechunk=datetime.timedelta(hours=1)): """Schema-agnostic method for retrieving metadata operations data. Wraps get_timeseries_data() but fills in the exact table name used in the LMT database schema. Args: datetime_start (datetime.datetime): lower bound on time series data to retrieve, inclusive datetime_End (datetime.datetime): upper bound on time series data to retrieve, exclusive timechunk (datetime.timedelta): divide time range query into sub-ranges of this width to work around N*N scaling of JOINs Returns: Tuple of (results, column names) where results are tuples of tuples as returned by the MySQL query and column names are the names of each column expressed in the individual rows of results. """ return self.get_timeseries_data('MDS_OPS_DATA', datetime_start, datetime_end, timechunk=timechunk)
[docs] def get_oss_data(self, datetime_start, datetime_end, timechunk=datetime.timedelta(hours=1)): """Schema-agnostic method for retrieving OSS data. Wraps get_timeseries_data() but fills in the exact table name used in the LMT database schema. Args: datetime_start (datetime.datetime): lower bound on time series data to retrieve, inclusive datetime_End (datetime.datetime): upper bound on time series data to retrieve, exclusive timechunk (datetime.timedelta): divide time range query into sub-ranges of this width to work around N*N scaling of JOINs Returns: Tuple of (results, column names) where results are tuples of tuples as returned by the MySQL query and column names are the names of each column expressed in the individual rows of results. """ return self.get_timeseries_data('OSS_DATA', datetime_start, datetime_end, timechunk=timechunk)
[docs] def get_ost_data(self, datetime_start, datetime_end, timechunk=datetime.timedelta(hours=1)): """Schema-agnostic method for retrieving OST data. Wraps get_timeseries_data() but fills in the exact table name used in the LMT database schema. Args: datetime_start (datetime.datetime): lower bound on time series data to retrieve, inclusive datetime_End (datetime.datetime): upper bound on time series data to retrieve, exclusive timechunk (datetime.timedelta): divide time range query into sub-ranges of this width to work around N*N scaling of JOINs Returns: Tuple of (results, column names) where results are tuples of tuples as returned by the MySQL query and column names are the names of each column expressed in the individual rows of results. """ return self.get_timeseries_data('OST_DATA', datetime_start, datetime_end, timechunk=timechunk)