Source code for jwql.tests.test_edb_telemetry_monitor

#! /usr/bin/env python

"""Tests for the EDB telemetry

Authors
-------

    - Bryan Hilbert

Use
---

    These tests can be run via the command line (omit the ``-s`` to
    suppress verbose output to stdout):
    ::

        pytest -s test_edb_telemetry_monitor.py
"""
from collections import defaultdict
from copy import deepcopy
import os
import pytest
from types import SimpleNamespace

from astropy.stats import sigma_clipped_stats
from astropy.table import Table
from astropy.table.column import Column
from astropy.time import Time, TimeDelta
import astropy.units as u
import datetime
import numpy as np

from jwql.database.database_interface import session
from jwql.edb.engineering_database import EdbMnemonic
from jwql.instrument_monitors.common_monitors import edb_telemetry_monitor as etm
from jwql.instrument_monitors.common_monitors.edb_telemetry_monitor_utils import condition as cond
from jwql.instrument_monitors.common_monitors.edb_telemetry_monitor_utils import utils as etm_utils
from jwql.tests.resources import has_test_db
from jwql.utils.constants import MIRI_POS_RATIO_VALUES


[docs] def test_add_every_change_history(): """Test that every_change data is correctly combined with an existing set of every_change data """ dates1 = np.array([datetime.datetime(2022, 3, 4, 1, 5, i) for i in range(10)]) data1 = np.array([0.1, 0.2, 0.1, 0.2, 0.1, 0.2, 0.1, 0.2, 0.1, 0.2]) means1 = 0.15 devs1 = 0.07 dates2 = np.array([dates1[-1] + datetime.timedelta(seconds=1 * i) for i in range(1, 11)]) data2 = np.array([0.3, 0.4, 0.3, 0.4, 0.3, 0.4, 0.3, 0.4, 0.3, 0.4]) means2 = 0.35 devs2 = 0.07 ec1 = {'0.15': (dates1, data1, means1, devs1), '0.35': (dates2, data2, means2, devs2) } ec2 = {'0.15': (dates1, data1, means1, devs1)} combine1 = etm.add_every_change_history(ec1, ec2) expected1 = defaultdict(list) expected1['0.15'] = (np.append(dates1, dates1), np.append(data1, data1), np.append(means1, means1), np.append(devs1, devs1)) expected1['0.35'] = (dates2, data2, means2, devs2) for key in combine1: print('compare ', key) for i, cele in enumerate(combine1[key]): assert np.all(cele == expected1[key][i]) dates3 = np.array([dates2[-1] + datetime.timedelta(seconds=1 * i) for i in range(1, 11)]) ec3 = {'0.55': (dates3, data2 + 0.2, means2 + 0.2, devs2)} combine2 = etm.add_every_change_history(ec1, ec3) expected2 = defaultdict(list) expected2['0.15'] = (dates1, data1, means1, devs1) expected2['0.35'] = (dates2, data2, means2, devs2) expected2['0.55'] = (dates3, data2 + 0.2, means2 + 0.2, devs2) for key in combine2: print('compare ', key) for i, cele in enumerate(combine2[key]): assert np.all(cele == expected2[key][i])
[docs] def test_change_only_add_points(): """Make sure we convert change-only data to AllPoints data correctly """ dates = [datetime.datetime(2021, 7, 14, 5, 24 + i, 0) for i in range(3)] values = np.arange(3) data = Table([dates, values], names=('dates', 'euvalues')) mnem = EdbMnemonic('SOMETHING', datetime.datetime(2021, 7, 14), datetime.datetime(2021, 7, 14, 6), data, {}, {}) mnem.meta = {'TlmMnemonics': [{'TlmMnemonic': 'SOMETHING', 'AllPoints': 0}]} mnem.change_only_add_points() expected_dates = [datetime.datetime(2021, 7, 14, 5, 24, 0), datetime.datetime(2021, 7, 14, 5, 24, 59, 999999), datetime.datetime(2021, 7, 14, 5, 25, 0), datetime.datetime(2021, 7, 14, 5, 25, 59, 999999), datetime.datetime(2021, 7, 14, 5, 26, 0)] expected_values = [0, 0, 1, 1, 2] expected = Table([expected_dates, expected_values], names=('dates', 'euvalues')) assert np.all(expected["dates"] == mnem.data["dates"]) assert np.all(expected["euvalues"] == mnem.data["euvalues"])
[docs] def test_conditions(): """Test the extraction of data using the ```equal``` class. """ # Create data for mnemonic of interest # start_time = Time('2022-02-02') # end_time = Time('2022-02-03') start_time = datetime.datetime(2022, 2, 2) end_time = datetime.datetime(2022, 2, 3) temp_data = Table() temp_data["euvalues"] = np.array([35., 35.1, 35.2, 36., 36.1, 36.2, 37.1, 37., 36., 36.]) # temp_data["dates"] = np.array([Time('2022-02-02') + TimeDelta(0.1 * i, format='jd') for i in range(10)]) temp_data["dates"] = np.array([start_time + datetime.timedelta(days=0.1 * i) for i in range(10)]) meta = {} info = {} temperature = EdbMnemonic("TEMPERATURE", start_time, end_time, temp_data, meta, info) # Create conditional data current_data = {} current_data["euvalues"] = np.array([1., 1., 1., 2.5, 2.5, 2.5, 5.5, 5.5, 2.5, 2.5]) current_data["dates"] = np.array([start_time + datetime.timedelta(days=0.1001 * i) for i in range(10)]) # Using a single relation class eq25 = cond.relation_test(current_data, '==', 2.5) condition_list = [eq25] condition_1 = cond.condition(condition_list) # Extract the good data condition_1.extract_data(temperature.data) # Expected results expected_table = Table() frac_days = [0.4, 0.5, 0.9] expected_table["dates"] = [start_time + datetime.timedelta(days=frac) for frac in frac_days] expected_table["euvalues"] = [36.1, 36.2, 36.0] assert np.all(condition_1.extracted_data == expected_table) assert condition_1.block_indexes == [0, 2, 3] grt0 = cond.relation_test(current_data, '>', 0) condition_list.append(grt0) condition_2 = cond.condition(condition_list) condition_2.extract_data(temperature.data) assert np.all(condition_2.extracted_data == expected_table) assert condition_2.block_indexes == [0, 2, 3] less10 = cond.relation_test(current_data, '<', 10) condition_list.append(less10) condition_3 = cond.condition(condition_list) condition_3.extract_data(temperature.data) assert np.all(condition_3.extracted_data == expected_table) assert condition_3.block_indexes == [0, 2, 3]
[docs] def test_find_all_changes(): inst = etm.EdbMnemonicMonitor() # Create test data start_time = Time('2022-02-02') end_time = Time('2022-02-03') temp_data = Table() temp_data["euvalues"] = [350., 350.1, 350.2, 360., 360.1, 360.2, 370.1, 370., 360., 360.] temp_data["dates"] = np.array([datetime.datetime(2022, 2, 2) + datetime.timedelta(days=0.1 * i) for i in range(10)]) meta = {'TlmMnemonics': [{'AllPoints': 1}]} info = {} temperature = EdbMnemonic("TEMPERATURE", start_time, end_time, temp_data, meta, info) temperature.blocks = [] # Create dictionary of dependency info dependency = [{"name": "CURRENT", "relation": "none", "threshold": 0}] # Create dependency data current_data = Table() current_data["euvalues"] = ['LOW', 'LOW', 'LOW', 'MEDIUM', 'MEDIUM', 'MEDIUM', 'HIGH', 'HIGH', 'MEDIUM', 'MEDIUM'] current_data["dates"] = np.array([datetime.datetime(2022, 2, 2) + datetime.timedelta(days=0.1001 * i) for i in range(10)]) inst.query_results[dependency[0]["name"]] = EdbMnemonic("CURRENT", start_time, end_time, current_data, meta, info) vals = inst.find_all_changes(temperature, dependency) assert np.isclose(vals.mean[0], 359.07) assert np.isclose(vals.median[0], 360.0) assert np.isclose(vals.stdev[0], 6.9818407314976785)
[docs] def test_get_averaging_time_duration(): """Test that only allowed string formats are used for averaging time duration """ in_strings = ["5_minute", "45_second", "10_day", "2_hour"] expected_vals = [5 * u.minute, 45 * u.second, 10 * u.day, 2 * u.hour] for inval, outval in zip(in_strings, expected_vals): output = etm_utils.get_averaging_time_duration(inval) assert output == outval bad_strings = ["7_years", "nonsense"] for inval in bad_strings: with pytest.raises(ValueError) as e_info: output = etm_utils.get_averaging_time_duration(inval)
[docs] def test_get_query_duration(): """Test that the correct query duration is found """ in_strings = ['daily_means', "every_change", "block_means", "time_interval", "all"] expected_vals = [datetime.timedelta(days=0.01041667), datetime.timedelta(days=1), datetime.timedelta(days=1), datetime.timedelta(days=1), datetime.timedelta(days=1)] for inval, outval in zip(in_strings, expected_vals): output = etm_utils.get_query_duration(inval) assert output == outval with pytest.raises(ValueError) as e_info: output = etm_utils.get_query_duration("bad_string")
[docs] def test_key_check(): """Test the dictionary key checker """ d = {'key1': [1, 2, 3], 'key4': 'a'} assert etm_utils.check_key(d, 'key1') == d['key1'] assert etm_utils.check_key(d, 'key2') is None
[docs] def test_multiple_conditions(): """Test that filtering using multiple conditions is working as expected. """ # Create data for mnemonic of interest start_time = datetime.datetime(2022, 2, 2) end_time = datetime.datetime(2022, 2, 3) temp_data = Table() temp_data["euvalues"] = Column(np.array([35., 35.1, 35.2, 36., 36.1, 36.2, 37.1, 37., 36., 36.])) temp_data["dates"] = Column(np.array([start_time + datetime.timedelta(days=0.1 * i) for i in range(10)])) meta = {} info = {} temperature = EdbMnemonic("TEMPERATURE", start_time, end_time, temp_data, meta, info) # Create conditional data current_data = {} current_data["euvalues"] = Column(np.array([1., 2.5, 2.5, 2.5, 2.5, 2.5, 5.5, 5.5, 2.5, 2.5])) current_data["dates"] = Column(np.array([start_time + datetime.timedelta(days=0.1001 * i) for i in range(10)])) element_data = {} element_data["euvalues"] = Column(np.repeat("OFF", 20)) element_data["euvalues"][13:] = "ON" element_data["dates"] = Column(np.array([start_time + datetime.timedelta(days=0.06 * i) for i in range(20)])) grt35 = cond.relation_test(temp_data, '>', 35.11) eq25 = cond.relation_test(current_data, '==', 2.5) off = cond.relation_test(element_data, '=', 'OFF') condition_list = [grt35, eq25, off] condition = cond.condition(condition_list) condition.extract_data(temperature.data) # Compare to expectations expected_table = temp_data[2:6] assert np.all(condition.extracted_data["euvalues"] == expected_table["euvalues"]) assert np.all(condition.extracted_data["dates"] == expected_table["dates"]) assert condition.block_indexes == [0, 4]
[docs] def test_organize_every_change(): """Test the reorganization of every_change data from an EdbMnemonic into something easier to plot """ basetime = datetime.datetime(2021, 4, 6, 14, 0, 0) dates = np.array([basetime + datetime.timedelta(seconds=600 * i) for i in range(20)]) # dates = np.array([basetime + TimeDelta(600 * i, format='sec') for i in range(20)]) vals = np.array([300.5, 310.3, -250.5, -500.9, 32.2, 300.1, 310.8, -250.2, -500.2, 32.7, 300.2, 310.4, -250.6, -500.8, 32.3, 300.4, 310.5, -250.4, -500.1, 32.9]) ec_vals = ["F2550W", 'F560W', 'F770W', 'F1000W', 'F1500W', "F2550W", 'F560W', 'F770W', 'F1000W', 'F1500W', "F2550W", 'F560W', 'F770W', 'F1000W', 'F1500W', "F2550W", 'F560W', 'F770W', 'F1000W', 'F1500W'] m = Table() m["dates"] = dates m["euvalues"] = vals mnem = EdbMnemonic('IMIR_HK_FW_POS_RATIO', Time('2021-04-06T00:00:00'), Time('2021-04-06T23:00:00'), m, {}, {"unit": "Pos Ratio"}) mnem.every_change_values = ec_vals data = etm.organize_every_change(mnem) f2550_idx = [0, 5, 10, 15] f560_idx = [1, 6, 11, 16] f770_idx = [2, 7, 12, 17] f1000_idx = [3, 8, 13, 18] f1500_idx = [4, 9, 14, 19] f2550_vals = vals[f2550_idx] f560_vals = vals[f560_idx] f770_vals = vals[f770_idx] f1000_vals = vals[f1000_idx] f1500_vals = vals[f1500_idx] f2550mean, _, _ = sigma_clipped_stats(f2550_vals, sigma=3) f560mean, _, _ = sigma_clipped_stats(f560_vals, sigma=3) f770mean, _, _ = sigma_clipped_stats(f770_vals, sigma=3) f1000mean, _, _ = sigma_clipped_stats(f1000_vals, sigma=3) f1500mean, _, _ = sigma_clipped_stats(f1500_vals, sigma=3) expected = {'F2550W': (np.array(dates[f2550_idx]), f2550_vals, MIRI_POS_RATIO_VALUES['FW']['F2550W'][0]), 'F560W': (np.array(dates[f560_idx]), f560_vals, MIRI_POS_RATIO_VALUES['FW']['F560W'][0]), 'F770W': (np.array(dates[f770_idx]), f770_vals, MIRI_POS_RATIO_VALUES['FW']['F770W'][0]), 'F1000W': (np.array(dates[f1000_idx]), f1000_vals, MIRI_POS_RATIO_VALUES['FW']['F1000W'][0]), 'F1500W': (np.array(dates[f1500_idx]), f1500_vals, MIRI_POS_RATIO_VALUES['FW']['F1500W'][0])} for key, val in expected.items(): assert np.all(val[0] == data[key][0]) assert np.all(val[1] == data[key][1]) assert np.all(val[2] == data[key][2])
[docs] def test_remove_outer_points(): """Test that points outside the requested time are removed for change-only data """ data = Table() data["dates"] = [datetime.datetime(2014, 12, 8) + datetime.timedelta(days=0.5 * (i + 1)) for i in range(5)] data["euvalues"] = [1, 2, 3, 4, 5] mnem = EdbMnemonic('TEST', datetime.datetime(2022, 12, 9), datetime.datetime(2022, 12, 10), data, {}, {}) orig = deepcopy(mnem) etm_utils.remove_outer_points(mnem) assert all(orig.data['dates'][1:-1] == mnem.data['dates']) assert all(orig.data['euvalues'][1:-1] == mnem.data['euvalues'])
[docs] @pytest.mark.skipif(not has_test_db(), reason='Modifies test database.') def test_add_new_block_db_entry(): query_time = datetime.datetime.now() # mock mnem structure dates = SimpleNamespace(data=datetime.datetime.now()) euvalues = SimpleNamespace(data=1) mnem = SimpleNamespace(data={'dates': dates, 'euvalues': euvalues}, stdev=0, median=1, max=2, min=1, mnemonic_identifier='test') monitor = etm.EdbMnemonicMonitor() monitor.identify_tables('nircam', 'daily') try: monitor.add_new_block_db_entry(mnem, query_time) new_entries = session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test') assert new_entries.count() == 1 finally: # clean up session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test').delete() session.commit() assert session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test').count() == 0
[docs] @pytest.mark.skipif(not has_test_db(), reason='Modifies test database.') def test_add_new_every_change_db_entry(): # mock mnem dict mnem_dict = {'test1': (datetime.datetime.now(), 1., 1., 1.), 'test2': (datetime.datetime.now(), 1., 1., 1.)} mnem = 'test_mnem' dependency_name = 'test_dependency' query_time = datetime.datetime.now() monitor = etm.EdbMnemonicMonitor() monitor.identify_tables('nircam', 'every_change') try: monitor.add_new_every_change_db_entry( mnem, mnem_dict, dependency_name, query_time) new_entries = session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test_mnem') assert new_entries.count() == 2 finally: # clean up session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test_mnem').delete() session.commit() assert session.query(monitor.history_table).filter( monitor.history_table.mnemonic == 'test_mnem').count() == 0