Source code for jwql.tests.test_bad_pixel_monitor

#! /usr/bin/env python

"""Tests for the bad pixel monitor module.

Authors
-------

    - Bryan Hilbert

Use
---

    These tests can be run via the command line (omit the ``-s`` to
    suppress verbose output to stdout):
    ::

        pytest -s test_badpix_monitor.py
"""
import datetime

import numpy as np
import os
import pytest

from jwst.datamodels import dqflags

from jwql.database.database_interface import engine, session
from jwql.database.database_interface import NIRCamBadPixelQueryHistory, NIRCamBadPixelStats
from jwql.database.database_interface import NIRISSBadPixelQueryHistory, NIRISSBadPixelStats
from jwql.database.database_interface import MIRIBadPixelQueryHistory, MIRIBadPixelStats
from jwql.database.database_interface import NIRSpecBadPixelQueryHistory, NIRSpecBadPixelStats
from jwql.database.database_interface import FGSBadPixelQueryHistory, FGSBadPixelStats
from jwql.instrument_monitors.common_monitors import bad_pixel_monitor
from jwql.tests.resources import has_test_db
from jwql.utils.constants import ON_GITHUB_ACTIONS


[docs] def test_bad_map_to_list(): """Check that bad pixel locations are correctly retrieved from an image """ # Create empty DQ image image = np.zeros((7, 7), dtype=np.uint16) # Set bad pixel locations do_not_use = ([0, 2, 4, 6], [0, 2, 4, 6]) hot = ([0, 1, 3, 5], [0, 2, 4, 6]) rc = ([0, 2, 4, 6], [0, 1, 3, 5]) # Populate with bad pixels image[do_not_use] = dqflags.pixel['DO_NOT_USE'] image[hot] += dqflags.pixel['HOT'] image[rc] += dqflags.pixel['RC'] # Call the bad pixel monitor x_do_not_use, y_do_not_use = bad_pixel_monitor.bad_map_to_list(image, 'DO_NOT_USE') x_hot, y_hot = bad_pixel_monitor.bad_map_to_list(image, 'HOT') x_rc, y_rc = bad_pixel_monitor.bad_map_to_list(image, 'RC') # Test assert do_not_use == (y_do_not_use, x_do_not_use) assert hot == (y_hot, x_hot) assert rc == (y_rc, x_rc)
[docs] def test_check_for_sufficient_files(): """Be sure that the file threshold values are being used correctly """ input_files = ['file1.fits', 'file2.fits', 'file1.fits', 'file3.fits', 'file1.fits'] instrument = 'nircam' aperture = 'NRCA1_FULL' threshold = 2 file_type = 'darks' files, to_run = bad_pixel_monitor.check_for_sufficient_files(input_files, instrument, aperture, threshold, file_type) assert files == ['file1.fits', 'file2.fits', 'file3.fits'] assert to_run is True threshold = 4 files, to_run = bad_pixel_monitor.check_for_sufficient_files(input_files, instrument, aperture, threshold, file_type) assert files is None assert to_run is False
[docs] def test_exclude_crds_mask_pix(): """Test that bad pixel images are differentiated correctly """ common_bad = ([0, 1, 2, 3, 4], [0, 1, 2, 3, 4]) bad1_only = ([0, 1, 3, 4], [4, 3, 1, 0]) bad2_only = ([3, 3, 3, 3], [0, 1, 2, 4]) bad1 = np.zeros((5, 5), dtype=np.uint8) bad1[common_bad] = 1 bad1[bad1_only] = 2 bad2 = np.zeros((5, 5), dtype=np.uint8) bad2[common_bad] = 1 bad2[bad2_only] = 4 # Create a mask to help with indexing mask = np.zeros(bad1.shape, dtype=bool) mask[bad1_only] = True diff = bad_pixel_monitor.exclude_crds_mask_pix(bad1, bad2) assert np.all(diff[mask] == 2) assert np.all(diff[~mask] == 0) # Test the reverse case mask = np.zeros(bad2.shape, dtype=bool) mask[bad2_only] = True diff = bad_pixel_monitor.exclude_crds_mask_pix(bad2, bad1) assert np.all(diff[mask] == 4) assert np.all(diff[~mask] == 0)
[docs] def test_filter_query_results(): """Test MAST query filtering to extract most common filter/pupil and acceptable readout patterns """ badpix = bad_pixel_monitor.BadPixels() badpix.instrument = 'nircam' dict1 = {'filter': 'F070W', 'pupil': 'CLEAR', 'readpatt': 'RAPID'} dict2 = {'filter': 'F090W', 'pupil': 'CLEAR', 'readpatt': 'BRIGHT1'} dict3 = {'filter': 'F070W', 'pupil': 'CLEAR', 'readpatt': 'BRIGHT1'} query_results = [dict1, dict2, dict1, dict1, dict3, dict1] filtered = badpix.filter_query_results(query_results, 'flat') assert filtered == [dict1, dict1, dict1, dict1] # NIRSpec badpix.instrument = 'nirspec' dict1 = {'filter': 'F070W', 'grating': 'CLEAR', 'readpatt': 'NRSRAPID'} dict2 = {'filter': 'F070W', 'grating': 'CLEAR', 'readpatt': 'NRSIRS2'} dict3 = {'filter': 'F070W', 'grating': 'CLEAR', 'readpatt': 'NRSRAPID'} query_results = [dict1, dict2, dict1, dict1, dict3, dict1] filtered = badpix.filter_query_results(query_results, 'flat') assert filtered == [dict1, dict1, dict1, dict3, dict1]
nrc_list = ['NRCA1_FULL', 'NRCB1_FULL', 'NRCA2_FULL', 'NRCB2_FULL', 'NRCA3_FULL', 'NRCB3_FULL', 'NRCA4_FULL', 'NRCB4_FULL', 'NRCA5_FULL', 'NRCB5_FULL'] nis_list = ['NIS_CEN'] nrs_list = ['NRS1_FULL', 'NRS2_FULL'] miri_list = [('MIRIMAGE', 'MIRIM_FULL'), ('MIRIFULONG', 'MIRIM_FULL'), ('MIRIFUSHORT', 'MIRIM_FULL')] fgs_list = ['FGS1_FULL', 'FGS2_FULL']
[docs] @pytest.mark.parametrize("instrument,expected_list", [("nircam", nrc_list), ("niriss", nis_list), ("nirspec", nrs_list), ("miri", miri_list), ("fgs", fgs_list)]) def test_get_possible_apertures(instrument, expected_list): """Make sure the correct apertures are returned for the given instrument """ badpix = bad_pixel_monitor.BadPixels() badpix.instrument = instrument ap_list = badpix.get_possible_apertures() assert ap_list == expected_list
[docs] @pytest.mark.skipif(ON_GITHUB_ACTIONS, reason='Requires access to central storage.') def test_identify_tables(): """Be sure the correct database tables are identified """ badpix = bad_pixel_monitor.BadPixels() badpix.instrument = 'nircam' badpix.identify_tables() assert badpix.query_table == eval('NIRCamBadPixelQueryHistory') assert badpix.pixel_table == eval('NIRCamBadPixelStats')
[docs] @pytest.mark.skipif(ON_GITHUB_ACTIONS, reason='Requires access to central storage.') def test_locate_rate_files(): """Test that rate files are found in filesystem""" uncal_files = ['jw02733002001_02101_00001_mirimage_uncal.fits', 'jw02733002001_02101_00002_mirimage_uncal.fits'] ratefiles, ratefiles2copy = bad_pixel_monitor.locate_rate_files(uncal_files) rates = [os.path.basename(entry) for entry in ratefiles] rates2copy = [os.path.basename(entry) for entry in ratefiles2copy] expected = ['jw02733002001_02101_00001_mirimage_rateints.fits', 'jw02733002001_02101_00002_mirimage_rateints.fits'] assert rates == expected assert rates2copy == expected
[docs] @pytest.mark.skipif(ON_GITHUB_ACTIONS, reason='Requires access to central storage.') def test_locate_uncal_files(): """Test the filesystem search for uncal files """ file1 = 'jw02733002001_02101_00001_mirimage_rate.fits' file2 = 'jw02733002001_02101_00002_mirimage_uncal.fits' query_results = [{'filename': file1}, {'filename': file2}] found = bad_pixel_monitor.locate_uncal_files(query_results) found_base = [os.path.basename(entry) for entry in found] assert found_base[0] == file1.replace('rate', 'uncal') assert found_base[1] == file2
[docs] def test_make_crds_parameter_dict(): """Test that the dictionary to be used for CRDS queries is properly created """ badpix = bad_pixel_monitor.BadPixels() badpix.instrument = 'nircam' badpix.detector = 'nrcalong' params = badpix.make_crds_parameter_dict() assert params['INSTRUME'] == 'NIRCAM' assert params['DETECTOR'] == 'NRCALONG' assert params['CHANNEL'] == 'LONG'
[docs] @pytest.mark.skipif(not has_test_db(), reason='Modifies test database.') def test_add_bad_pix(): coord = ([1, 2, 3], [4, 5, 6]) pixel_type = 'test_new_pixel_type' files = ['test.fits'] obs_start = obs_mid = obs_end = datetime.datetime.now() baseline = 'baseline.fits' badpix = bad_pixel_monitor.BadPixels() badpix.instrument = 'nircam' badpix.detector = 'nrcalong' badpix.identify_tables() try: badpix.add_bad_pix(coord, pixel_type, files, obs_start, obs_mid, obs_end, baseline) new_entries = session.query(badpix.pixel_table).filter( badpix.pixel_table.type == pixel_type) assert new_entries.count() == 1 assert new_entries[0].baseline_file == baseline assert np.all(new_entries[0].x_coord == coord[0]) assert np.all(new_entries[0].y_coord == coord[1]) finally: # clean up session.query(badpix.pixel_table).filter( badpix.pixel_table.type == pixel_type).delete() session.commit() assert session.query(badpix.pixel_table).filter( badpix.pixel_table.type == pixel_type).count() == 0
[docs] @pytest.mark.skipif(not has_test_db(), reason='Modifies test database.') def test_exclude_existing_badpix(): coord = ([9999], [9999]) pixel_type = 'hot' badpix = bad_pixel_monitor.BadPixels() badpix.instrument = 'nircam' badpix.detector = 'nrcalong' badpix.identify_tables() # bad pixel type should raise error with pytest.raises(ValueError) as err: badpix.exclude_existing_badpix(coord, 'test_bad_type') assert 'bad pixel type' in str(err) # new pixel should not be found new_x, new_y = badpix.exclude_existing_badpix(coord, pixel_type) assert new_x == [9999] assert new_y == [9999] # add pixel, test again files = ['test.fits'] obs_start = obs_mid = obs_end = datetime.datetime.now() baseline = 'test_baseline.fits' try: badpix.add_bad_pix(coord, pixel_type, files, obs_start, obs_mid, obs_end, baseline) new_entries = session.query(badpix.pixel_table).filter( badpix.pixel_table.baseline_file == baseline) assert new_entries.count() == 1 # new pixel should be found new_x, new_y = badpix.exclude_existing_badpix(coord, pixel_type) assert new_x == [] assert new_y == [] finally: # clean up session.query(badpix.pixel_table).filter( badpix.pixel_table.baseline_file == baseline).delete() session.commit() assert session.query(badpix.pixel_table).filter( badpix.pixel_table.baseline_file == baseline).count() == 0