Source code for jwql.instrument_monitors.common_monitors.bad_pixel_monitor

#! /usr/bin/env python

"""This module contains code for the bad/dead pixel monitor.

The monitor calls the ``bad_pixel_mask.py`` module in the
``spacetelescope/jwst_reffiles`` package in order to identify bad
pixels. (``https://github.com/spacetelescope/jwst_reffiles``)

The bad pixel mnemonics to be searched for are given
in the jwst package:
``https://jwst-pipeline.readthedocs.io/en/stable/jwst/references_general/
references_general.html#data-quality-flags``

The algorithms used to identify the various types of bad pixels were
developed by the JWST Calibration Reference File Generation Tools working
group, which was under the Calibration Working Group. This working group
contained representatives from all instruments, and worked to identify
common algorithms that all instruments could use to generate information
to be placed in reference files. Details on the algorithms used here to
identify bad pixels are detailed here:
``https://outerspace.stsci.edu/display/JWSTCC/Algorithm+details%3A+DQ+Init``

The bad pixel search is composed of two different parts, each of which
can be run independently.

1. Internal flat field exposures are used to search for ``DEAD``,
``LOW QE``, ``OPEN``, and ``ADJACENT TO OPEN`` pixels.

2. Dark current exposures are used to search for ``NOISY``, ``HOT``,
``RC``, ``TELEGRAPH``, and ``LOW_PEDESTAL`` pixels.

Both of these modules expect input data in at least 2 calibration
states. In practice, the bad pixel monitor will search MAST for the
appropriate dark current and flat field files. In both cases, a given
file is considered useful if the uncal (and potentially the rate)
versions of the file are present in the archive. For files where the
uncal version only is found, the pipeline is run to produce the rate
file.

Once a sufficient number of new flats or darks are identified, the bad
pixel montior is called. The ``bad_pixel_file_thresholds.txt`` file
contains a list of the minimum number of new files necessary to run the
monitor for each instrument and aperture.

For the flat field files, the pipeline is run on the uncal files far
enough to produce cosmic ray flagged (jump) files. These are also
needed for the bad pixel search.

The ``jwst_reffiles`` ``bad_pixel_mask.py`` is run, and returns a map of
bad pixels for each of the various bad pixel mnemonics. The
``bad_pixel_monitor`` then downloads the latest bad pixel mask in CRDS
for the given instrument and detector/aperture, and this is compared to
the new map of bad pixels. For each bad pixel mnemonic, any pixels
flagged as bad that are not bad in the current reference file are saved
to the (e.g. ``NIRCamBadPixelStats``) database table.

Author
------

    - Bryan Hilbert

Use
---

    This module can be used from the command line as such:

    ::

        python bad_pixel_monitor.py

Notes
-----

The bad pixel flat that utilizes flat field ramps can't be used with
NIRCam since NIRCam has no internal lamp and therefore will not be
taking any more internal flat field images. Could perhaps be used with
a series of external undithered observations, but that's something to
think about later.

Templates to use: ``FGS_INTFLAT``, ``NIS_LAMP``, ``NRS_LAMP``,
``MIR_DARK``
"""

from collections import defaultdict
from copy import deepcopy
import datetime
from glob import glob
import logging
import os
from time import sleep

from astropy.io import ascii, fits
from astropy.time import Time
from jwst.datamodels import dqflags
from jwst_reffiles.bad_pixel_mask import bad_pixel_mask
import numpy as np

from jwql.database.database_interface import engine, session
from jwql.database.database_interface import NIRCamBadPixelQueryHistory, NIRCamBadPixelStats
from jwql.database.database_interface import NIRISSBadPixelQueryHistory, NIRISSBadPixelStats
from jwql.database.database_interface import MIRIBadPixelQueryHistory, MIRIBadPixelStats
from jwql.database.database_interface import NIRSpecBadPixelQueryHistory, NIRSpecBadPixelStats
from jwql.database.database_interface import FGSBadPixelQueryHistory, FGSBadPixelStats
from jwql.instrument_monitors import pipeline_tools
from jwql.shared_tasks.shared_tasks import only_one, run_pipeline, run_parallel_pipeline
from jwql.utils import crds_tools, instrument_properties, monitor_utils
from jwql.utils.constants import DARKS_BAD_PIXEL_TYPES, DARK_EXP_TYPES, FLATS_BAD_PIXEL_TYPES, FLAT_EXP_TYPES
from jwql.utils.constants import JWST_INSTRUMENT_NAMES, JWST_INSTRUMENT_NAMES_MIXEDCASE, ON_GITHUB_ACTIONS
from jwql.utils.constants import ON_READTHEDOCS
from jwql.utils.logging_functions import log_info, log_fail
from jwql.utils.mast_utils import mast_query
from jwql.utils.permissions import set_permissions
from jwql.utils.utils import copy_files, create_png_from_fits, ensure_dir_exists, get_config, filesystem_path


if not ON_GITHUB_ACTIONS and not ON_READTHEDOCS:
    from jwql.website.apps.jwql.monitor_pages.monitor_bad_pixel_bokeh import BadPixelPlots

THRESHOLDS_FILE = os.path.join(os.path.split(__file__)[0], 'bad_pixel_file_thresholds.txt')


[docs] def bad_map_to_list(badpix_image, mnemonic): """Given a DQ image and a bad pixel mnemonic, create a list of (x,y) locations of this type of bad pixel in ``badpix_image`` Parameters ---------- badpix_image : numpy.ndarray 2D image of bad pixels (i.e. a DQ array) mnemonic : str The type of bad pixel to map. The mnemonic must be one of those in the JWST calibration pipeline's list of possible mnemonics Returns ------- x_loc : list List of x locations within ``badpix_image`` containing ``mnemonic`` pixels. y_loc : list List of x locations within ``badpix_image`` containing ``mnemonic`` pixels. """ mnemonic = mnemonic.upper() possible_mnemonics = dqflags.pixel.keys() if mnemonic not in possible_mnemonics: raise ValueError("ERROR: Unrecognized bad pixel mnemonic: {}".format(mnemonic)) # Find locations of this type of bad pixel y_loc, x_loc = np.where(badpix_image & dqflags.pixel[mnemonic] > 0) # Convert from numpy int to python native int, in order to avoid SQL # error when adding to the database tables. y_location = [int(element) for element in y_loc] x_location = [int(element) for element in x_loc] return x_location, y_location
[docs] def check_for_sufficient_files(uncal_files, instrument_name, aperture_name, threshold_value, file_type): """From a list of files of a given type (flats or darks), check to see if there are enough files to call the bad pixel monitor. The number of files must be equal to or greater than the provided ``threshold_value``. Parameters ---------- uncal_files : list List of filenames instrument_name : str Name of JWST instrument (e.g. ``nircam``) that the data are from. This is used only in logging statements aperture_name : str Name of aperture (e.g.`` NRCA1_FULL``) that the data are from. This is used only in logging statements threshold_value : int Minimum number of files required in order to run the bad pixel monitor file_type : str Either `darks`` or ``flats``. This is used only in the logging statements. Returns ------- uncal_files : list List of sorted, unique files from the input file list. Set to ``None`` if the number of unique files is under the threshold run_data : bool Whether or not the bad pixel monitor will be called on these files. """ if file_type not in ['darks', 'flats']: raise ValueError('Input file_type must be "darks" or "flats"') file_type_singular = file_type.strip('s') if len(uncal_files) > 0: uncal_files = sorted(list(set(uncal_files))) if len(uncal_files) < threshold_value: logging.info(('\tBad pixels from {} skipped. {} new {} files for {},' '{} found. {} new files are required to run bad pixels' 'from {} portion of monitor.') .format(file_type, len(uncal_files), file_type_singular, instrument_name, aperture_name, threshold_value, file_type)) uncal_files = None run_data = False else: logging.info('\tSufficient new files found for {}, {} to run the' 'bad pixel from {} portion of the monitor.' .format(instrument_name, aperture_name, file_type)) logging.info('\tNew entries: {}'.format(len(uncal_files))) run_data = True return uncal_files, run_data
[docs] def exclude_crds_mask_pix(bad_pix, existing_bad_pix): """Find differences between a set of newly-identified bad pixels and an existing set. Return a list of newly-discovered bad pixels that are not present in the existing set. Parameters ---------- bad_pix : numpy.ndarray 2D array of bad pixel flags. Flags must correspond to the defintiions used by the JWST calibration pipeline existing_bad_pix : numpy.ndarray 2D array of bad pixel flags. Flags must correspond to the definitions used by the JWST calibration pipeline Returns ------- new_bad_pix : numpy.ndarray 2D array of bad pixel flags contained in ``bad_pix`` but not ``existing_bad_pix`` """ return bad_pix - (bad_pix & existing_bad_pix)
[docs] def locate_rate_files(uncal_files): """Given a list of uncal (raw) files, generate a list of corresponding rate files. For each uncal file, if the rate file is present in the filesystem, add the name of the rate file (if a rateints file exists, use that) to the list of files. If no rate file is present, add ``None`` to the list. Parameters ---------- uncal_files : list List of uncal files to use as the basis of the search Returns ------- rate_files : list List of rate files. This list corresponds 1-to-1 with ``uncal_files``. Any missing rate files are listed as None. rate_files_to_copy : list Same as ``rate_files`` but without the None entries. This is a list of only the rate files that exist in the filesystem """ if uncal_files is None: return None, None rate_files = [] rate_files_to_copy = [] for uncal in uncal_files: base = uncal.split('_uncal.fits')[0] constructed_ratefile = '{}_rateints.fits'.format(base) try: rate_files.append(filesystem_path(constructed_ratefile)) rate_files_to_copy.append(filesystem_path(constructed_ratefile)) except FileNotFoundError: constructed_ratefile = '{}_rate.fits'.format(base) try: rate_files.append(filesystem_path(constructed_ratefile)) rate_files_to_copy.append(filesystem_path(constructed_ratefile)) except FileNotFoundError: rate_files.append('None') return rate_files, rate_files_to_copy
[docs] def locate_uncal_files(query_result): """Given a MAST query result, locate the raw version (``uncal.fits``) of the listed files in the filesystem. Parameters ---------- query_result : list MAST query results. List of dictionaries Returns ------- uncal_files : list List of raw file locations within the filesystem """ uncal_files = [] for entry in query_result: filename = entry['filename'] suffix = filename.split('_')[-1].replace('.fits', '') uncal_file = filename.replace(suffix, 'uncal') # Look for uncal file try: uncal_files.append(filesystem_path(uncal_file)) except FileNotFoundError: logging.warning('\t\tUnable to locate {} in filesystem. Not including in processing.' .format(uncal_file)) return uncal_files
[docs] class BadPixels(): """Class for executing the bad pixel monitor. This class will search for new (since the previous instance of the class) dark current and internal flat field files in the filesystem. It will loop over instrument/aperture combinations and find the number of new dark/flat files available. If there are enough, it will copy the files over to a working directory and run the monitor. This will use the ``jwst_reffiles`` package to locate new bad pixels, which will be returned as a map. This map will be compared to the current bad pixel reference file (``dq_init``) in CRDS, and any the coordinates and type of any new bad pixels will be saved in a database table. Attributes ---------- aperture : str Aperture name of the data (e.g. ``NRCA1_FULL``) dark_query_start : float Date (in ``MJD``) of the ending range of the previous MAST query where the bad pixel from darks monitor was run. data_dir : str Directory that contains the files copied from MAST to be used by the bad pixel monitor detector : str Detector associated with the data (e.g. ``NRCA1``) flat_query_start : float Date (in MJD) of the ending range of the previous MAST query where the bad pixel from flats monitor was run. instrument : str Name of the JWST instrument the data are from nints : int Number of integrations in the exposure output_dir : str Top level output directory associated with the bad pixel monitor, as retrieved from the JWQL config file pixel_table : sqlalchemy table Database table containing lists of bad pixels identified during runs of the bad pixel monitor query_end : float MJD of the execution time of the bad pixel monitor. This is used as the ending time of the MAST query. query_table : sqlalchemy table Database table containing the history of MAST queries for the bad pixel monitor. Raises ------ ValueError If NINT or DETECTOR is missing from input file header ValueError If an unrecognized bad pixel mnemonic is encountered ValueError If the number of uncal and rate files does not match ValueError If the most recent query search returns more than one entry """ def __init__(self): """Initialize an instance of the ``BadPixels`` class."""
[docs] def add_bad_pix(self, coordinates, pixel_type, files, obs_start_time, obs_mid_time, obs_end_time, baseline_file): """Add a set of bad pixels to the bad pixel database table Parameters ---------- coordinates : tuple Tuple of two lists, containing x,y coordinates of bad pixels (Output of ``np.where`` call) pixel_type : str Type of bad pixel. e.g. ``dead``, ``hot``, and ``noisy`` files : list List of fits files which were used to identify the bad pixels obs_start_time : datetime.datetime Observation time of the earliest file in ``files`` obs_mid_time : datetime.datetime Average of the observation times in ``files`` obs_end_time : datetime.datetime Observation time of the latest file in ``files`` baseline_file : str Name of baseline bad pixel file against which the new bad pixel population was compared """ logging.info('Adding {} {} pixels to database.'.format(len(coordinates[0]), pixel_type)) source_files = [os.path.basename(item) for item in files] entry = {'detector': self.detector, 'x_coord': coordinates[0], 'y_coord': coordinates[1], 'type': pixel_type, 'source_files': source_files, 'obs_start_time': obs_start_time, 'obs_mid_time': obs_mid_time, 'obs_end_time': obs_end_time, 'baseline_file': baseline_file, 'entry_date': datetime.datetime.now()} with engine.begin() as connection: connection.execute(self.pixel_table.__table__.insert(), entry)
[docs] def filter_query_results(self, results, datatype): """Filter MAST query results. For input flats, keep only those with the most common filter/pupil/grating combination. For both flats and darks, keep only those with the most common readout pattern. Parameters ---------- results : list List of query results, as returned by ``mast_query()`` datatype : str Type of data being filtered. ``flat`` or ``dark``. Returns ------- readpatt_filtered : list Filtered list of query results. """ # Need to filter all instruments' results by filter. # Choose filter with the most files # Only for flats if ((datatype == 'flat') and (self.instrument != 'fgs')): if self.instrument in ['nircam', 'niriss']: filter_on = 'pupil' elif self.instrument == 'nirspec': filter_on = 'grating' elif self.instrument == 'miri': filter_on = 'filter' filter_list = ['{}:{}'.format(entry['filter'], entry[filter_on]) for entry in results] filter_set = list(set(filter_list)) # Find the filter with the largest number of entries maxnum = 0 maxfilt = '' for filt in filter_set: if filter_list.count(filt) > maxnum: maxnum = filter_list.count(filt) maxfilt = filt filter_name, other_name = maxfilt.split(':') filtered = [] for entry in results: if ((str(entry['filter']) == filter_name) and (str(entry[filter_on]) == other_name)): filtered.append(entry) results = deepcopy(filtered) # All instruments: need to filter by readout pattern. # Any pattern name not containing "IRS2" is ok # choose readout pattern with the most entries readpatt_list = [entry['readpatt'] for entry in results] readpatt_set = list(set(readpatt_list)) maxnum = 0 maxpatt = '' for patt in readpatt_set: if ((readpatt_list.count(patt) > maxnum) and ('IRS2' not in patt)): maxnum = readpatt_list.count(patt) maxpatt = patt # Find the readpattern with the largest number of entries readpatt_filtered = [] for entry in results: if entry['readpatt'] == maxpatt: readpatt_filtered.append(entry) return readpatt_filtered
[docs] def get_metadata(self, filename): """Collect basic metadata from a fits file Parameters ---------- filename : str Name of fits file to examine """ header = fits.getheader(filename) try: self.detector = header['DETECTOR'] self.nints = header['NINTS'] except KeyError as e: logging.error(e)
[docs] def get_possible_apertures(self): """Generate a list of possible SIAF apertures for the given instrument. Returns ------- possible_aperture : list List of acceptible apertures for self.instrument """ if self.instrument == 'nircam': possible_apertures = [] for i in range(1, 6): possible_apertures.append('NRCA{}_FULL'.format(i)) possible_apertures.append('NRCB{}_FULL'.format(i)) if self.instrument == 'niriss': possible_apertures = ['NIS_CEN'] if self.instrument == 'miri': # Since MIRI is organized a little bit differently than the # other instruments, you can't use aperture names to uniquely # identify the full frame darks/flats from a given detector. # Instead you must use detector names. possible_apertures = [('MIRIMAGE', 'MIRIM_FULL'), ('MIRIFULONG', 'MIRIM_FULL'), ('MIRIFUSHORT', 'MIRIM_FULL')] if self.instrument == 'fgs': possible_apertures = ['FGS1_FULL', 'FGS2_FULL'] if self.instrument == 'nirspec': possible_apertures = ['NRS1_FULL', 'NRS2_FULL'] return possible_apertures
[docs] def exclude_existing_badpix(self, badpix, pixel_type): """Given a set of coordinates of bad pixels, determine which of these pixels have been previously identified and remove them from the list Parameters ---------- badpix : tuple Tuple of lists containing x and y pixel coordinates. (Output of ``numpy.where`` call) pixel_type : str Type of bad pixel being examined. Options are ``hot``, ``dead``, and ``noisy`` Returns ------- new_pixels_x : list List of x coordinates of new bad pixels new_pixels_y : list List of y coordinates of new bad pixels """ if pixel_type not in ['hot', 'dead', 'noisy']: raise ValueError('Unrecognized bad pixel type: {}'.format(pixel_type)) db_entries = session.query(self.pixel_table) \ .filter(self.pixel_table.type == pixel_type) \ .filter(self.pixel_table.detector == self.detector) \ .all() already_found = [] if len(db_entries) != 0: for _row in db_entries: x_coords = _row.x_coord y_coords = _row.y_coord for x, y in zip(x_coords, y_coords): already_found.append((x, y)) # Check to see if each pixel already appears in the database for # the given bad pixel type new_pixels_x = [] new_pixels_y = [] for x, y in zip(badpix[0], badpix[1]): pixel = (x, y) if pixel not in already_found: new_pixels_x.append(x) new_pixels_y.append(y) session.close() return (new_pixels_x, new_pixels_y)
[docs] def identify_tables(self): """Determine which database tables to use for a run of the bad pixel monitor """ mixed_case_name = JWST_INSTRUMENT_NAMES_MIXEDCASE[self.instrument] self.query_table = eval('{}BadPixelQueryHistory'.format(mixed_case_name)) self.pixel_table = eval('{}BadPixelStats'.format(mixed_case_name))
[docs] def map_uncal_and_rate_file_lists(self, uncal_files, rate_files, rate_files_to_copy, obs_type): """Copy uncal and rate files from the filesystem to the working directory. Any requested files that are not in the filesystem are noted and skipped. Return the file lists with skipped files removed. Parameters ---------- uncal_files : list List of raw files to be copied rate_files : list List of rate (slope) images to be copied. This list should correspond 1-to-1 with ``uncal_files``. Any rate files that were not found in the MAST query should be set to None. rate_files_to_copy : list Similar to ``rate_files`` but with the None entries omitted. obs_type : str Observation type (``dark`` or ``flat``). Used only for logging Returns ------- uncal_files : list List of the input raw files with any that failed to copy removed rate_files : list List of the input rate files with any that failed to copy removed (if the uncal also failed) or set to None (if only the rate file failed) """ # Copy files from filesystem uncal_copied_files, uncal_not_copied = copy_files(uncal_files, self.working_data_dir) rate_copied_files, rate_not_copied = copy_files(rate_files_to_copy, self.working_data_dir) # Set any rate files that failed to copy to None so # that we can regenerate them if len(rate_not_copied) > 0: for badfile in rate_not_copied: rate_files[rate_files.index(badfile)] = 'None' # Any uncal files that failed to copy must be removed # entirely from the uncal and rate lists if len(uncal_not_copied) > 0: for badfile in uncal_not_copied: bad_index = uncal_files.index(badfile) del uncal_files[bad_index] del rate_files[bad_index] logging.info('\tNew {} observations: '.format(obs_type)) logging.info('\tData dir: {}'.format(self.working_data_dir)) logging.info('\tCopied to data dir: {}'.format(uncal_copied_files)) logging.info('\tNot copied (failed, or missing from filesystem): {}'.format(uncal_not_copied)) # After all this, the lists should be the same length # and have a 1-to-1 correspondence if len(uncal_files) != len(rate_files): print('Lists of {} uncal and rate files have different lengths!!'.format(obs_type)) raise ValueError return uncal_files, rate_files
[docs] def make_crds_parameter_dict(self): """Construct a paramter dictionary to be used for querying CRDS Returns ------- parameters : dict Dictionary of parameters, in the format expected by CRDS """ parameters = {} parameters['INSTRUME'] = self.instrument.upper() parameters['SUBARRAY'] = 'FULL' parameters['DATE-OBS'] = datetime.date.today().isoformat() current_date = datetime.datetime.now() parameters['TIME-OBS'] = current_date.time().isoformat() parameters['DETECTOR'] = self.detector.upper() if self.instrument.upper() == 'NIRCAM': if parameters['DETECTOR'] in ['NRCALONG', 'NRCBLONG']: parameters['CHANNEL'] = 'LONG' else: parameters['CHANNEL'] = 'SHORT' return parameters
[docs] def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_count_threshold, dark_raw_files, dark_slope_files, dark_file_count_threshold): """The main method for processing darks. See module docstrings for further details. Parameters ---------- illuminated_raw_files : list List of filenames (including full paths) of raw (uncal) flat field files. These should all be for the same detector and aperture. illuminated_slope_files : list List of filenames (including full paths) of flat field slope files. These should all be for the same detector and aperture and correspond one-to-one with ``illuminated_raw_files``. For cases where a raw file exists but no slope file, the slope file should be None dark_raw_files : list List of filenames (including full paths) of raw (uncal) dark files. These should all be for the same detector and aperture. dark_slope_files : list List of filenames (including full paths) of dark current slope files. These should all be for the same detector and aperture and correspond one-to-one with ``dark_raw_files``. For cases where a raw file exists but no slope file, the slope file should be ``None`` """ # Illuminated files - run entirety of calwebb_detector1 for uncal # files where corresponding rate file is 'None' badpix_types = [] illuminated_obstimes = [] if illuminated_raw_files: logging.info("Found {} uncalibrated flat fields".format(len(illuminated_raw_files))) badpix_types.extend(FLATS_BAD_PIXEL_TYPES) out_exts = defaultdict(lambda: ['jump', '0_ramp_fit']) in_files = [] for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files): logging.info("\tChecking illuminated raw file {} with rate file {}".format(uncal_file, rate_file)) self.get_metadata(uncal_file) if rate_file == 'None': short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file)) logging.info('Calling pipeline for {}'.format(uncal_file)) logging.info("Copying raw file to {}".format(self.working_data_dir)) copy_files([uncal_file], self.working_data_dir) if hasattr(self, 'nints') and self.nints > 1: out_exts[short_name] = ['jump', '1_ramp_fit'] needs_calibration = False for file_type in out_exts[short_name]: if not os.path.isfile(local_uncal_file.replace("uncal", file_type)): needs_calibration = True if needs_calibration: in_files.append(local_uncal_file) else: logging.info("\t\tCalibrated files already exist for {}".format(short_name)) else: logging.info("\tRate file found for {}".format(uncal_file)) if os.path.isfile(rate_file): copy_files([rate_file], self.working_data_dir) else: logging.warning("\tRate file {} doesn't actually exist".format(rate_file)) short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file)) logging.info('Calling pipeline for {}'.format(uncal_file)) logging.info("Copying raw file to {}".format(self.working_data_dir)) copy_files([uncal_file], self.working_data_dir) if hasattr(self, 'nints') and self.nints > 1: out_exts[short_name] = ['jump', '1_ramp_fit'] needs_calibration = False for file_type in out_exts[short_name]: if not os.path.isfile(local_uncal_file.replace("uncal", file_type)): needs_calibration = True if needs_calibration: in_files.append(local_uncal_file) else: logging.info("\t\tCalibrated files already exist for {}".format(short_name)) outputs = {} if len(in_files) > 0: logging.info("Running pipeline for {} files".format(len(in_files))) outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) index = 0 logging.info("Checking files post-calibration") for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files): logging.info("\tChecking files {}, {}".format(os.path.basename(uncal_file), os.path.basename(rate_file))) local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file)) if local_uncal_file in outputs: logging.info("\t\tAdding calibrated file.") illuminated_slope_files[index] = deepcopy(outputs[local_uncal_file][1]) else: logging.info("\t\tCalibration was skipped for file") self.get_metadata(illuminated_raw_files[index]) local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit") local_rateints_file = local_uncal_file.replace("uncal", "rateints") if hasattr(self, 'nints') and self.nints > 1: local_ramp_file = local_ramp_file.replace("0_ramp_fit", "1_ramp_fit") if os.path.isfile(local_ramp_file): logging.info("\t\t\tFound local ramp file") illuminated_slope_files[index] = local_ramp_file elif os.path.isfile(local_rateints_file): logging.info("\t\t\tFound local rateints file") illuminated_slope_files[index] = local_rateints_file else: logging.info("\t\t\tNo local files found") illuminated_slope_files[index] = None index += 1 # Get observation time for all files illuminated_obstimes.append(instrument_properties.get_obstime(uncal_file)) logging.info("Trimming unfound files.") index = 0 while index < len(illuminated_raw_files): if illuminated_slope_files[index] is None or illuminated_slope_files[index] == 'None': logging.info("\tRemoving {}".format(illuminated_raw_files[index])) del illuminated_raw_files[index] del illuminated_slope_files[index] del illuminated_obstimes[index] else: index += 1 min_illum_time = 0. max_illum_time = 0. mid_illum_time = 0. if len(illuminated_obstimes) > 0: min_illum_time = min(illuminated_obstimes) max_illum_time = max(illuminated_obstimes) mid_illum_time = instrument_properties.mean_time(illuminated_obstimes) # Dark files - Run calwebb_detector1 on all uncal files, saving the # Jump step output. If corresponding rate file is 'None', then also # run the ramp-fit step and save the output dark_jump_files = [] dark_fitopt_files = [] dark_obstimes = [] if dark_raw_files: logging.info("Found {} uncalibrated darks".format(len(dark_raw_files))) index = 0 badpix_types.extend(DARKS_BAD_PIXEL_TYPES) # In this case we need to run the pipeline on all input files, # even if the rate file is present, because we also need the jump # and fitops files, which are not saved by default in_files = [] out_exts = defaultdict(lambda: ['jump', 'fitopt', '0_ramp_fit']) for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files): logging.info("Checking dark file {} with rate file {}".format(uncal_file, rate_file)) self.get_metadata(uncal_file) short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file)) if not os.path.isfile(local_uncal_file): logging.info("\tCopying raw file to {}".format(self.working_data_dir)) copy_files([uncal_file], self.working_data_dir) if hasattr(self, 'nints') and self.nints > 1: out_exts[short_name] = ['jump', 'fitopt', '1_ramp_fit'] local_processed_files = [local_uncal_file.replace("uncal", x) for x in out_exts[short_name]] calibrated_data = [os.path.isfile(x) for x in local_processed_files] if not all(calibrated_data): logging.info('\tCalling pipeline for {} {}'.format(uncal_file, rate_file)) in_files.append(local_uncal_file) dark_jump_files.append(None) dark_fitopt_files.append(None) dark_slope_files[index] = None else: logging.info("\tProcessed files already exist.") dark_jump_files.append(local_processed_files[0]) dark_fitopt_files.append(local_processed_files[1]) dark_slope_files[index] = deepcopy(local_processed_files[2]) dark_obstimes.append(instrument_properties.get_obstime(uncal_file)) index += 1 outputs = {} if len(in_files) > 0: logging.info("Running pipeline for {} files".format(len(in_files))) outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) index = 0 logging.info("Checking files post-calibration") for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files): logging.info("\tChecking files {}, {}".format(uncal_file, rate_file)) local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file)) short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') if local_uncal_file in outputs: logging.info("\t\tAdding calibrated files") dark_jump_files[index] = outputs[local_uncal_file][0] dark_fitopt_files[index] = outputs[local_uncal_file][1] dark_slope_files[index] = deepcopy(outputs[local_uncal_file][2]) else: logging.info("\t\tCalibration skipped for file") self.get_metadata(local_uncal_file) local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit") if hasattr(self, 'nints') and self.nints > 1: local_ramp_file = local_ramp_file.replace("0_ramp_fit", "1_ramp_fit") if not os.path.isfile(local_uncal_file.replace("uncal", "jump")): logging.info("\t\t\tJump file not found") dark_jump_files[index] = None else: dark_jump_files[index] = local_uncal_file.replace("uncal", "jump") if not os.path.isfile(local_uncal_file.replace("uncal", "fitopt")): logging.info("\t\t\tFitopt file not found") dark_fitopt_files[index] = None else: dark_fitopt_files[index] = local_uncal_file.replace("uncal", "fitopt") if not os.path.isfile(local_ramp_file): if os.path.isfile(local_uncal_file.replace("uncal", "rateints")): dark_slope_files[index] = local_uncal_file.replace("uncal", "rateints") else: logging.info("\t\t\tRate file not found") dark_slope_files[index] = None else: dark_slope_files[index] = local_ramp_file index += 1 index = 0 logging.info("Trimming unfound files.") while index < len(dark_raw_files): if dark_jump_files[index] is None or dark_fitopt_files[index] is None or dark_slope_files[index] is None: logging.info("\tRemoving {}".format(dark_raw_files[index])) del dark_raw_files[index] del dark_jump_files[index] del dark_fitopt_files[index] del dark_slope_files[index] del dark_obstimes[index] else: index += 1 if len(dark_slope_files) > 0: min_dark_time = min(dark_obstimes) max_dark_time = max(dark_obstimes) mid_dark_time = instrument_properties.mean_time(dark_obstimes) # Check whether there are still enough files left to meet the threshold if illuminated_slope_files is None: flat_length = 0 else: flat_length = len(illuminated_slope_files) if dark_slope_files is None: dark_length = 0 else: dark_length = len(dark_slope_files) if (flat_length < flat_file_count_threshold) and (dark_length < dark_file_count_threshold): logging.info("After removing failed files, not enough new files remian.") return # For the dead flux check, filter out any files that have less than # 4 groups dead_flux_files = [] if illuminated_raw_files is not None: for illum_file in illuminated_raw_files: ngroup = fits.getheader(illum_file)['NGROUPS'] if ngroup >= 4: dead_flux_files.append(illum_file) if len(dead_flux_files) == 0: dead_flux_files = None # Instrument-specific preferences from jwst_reffiles meetings if self.instrument in ['nircam', 'niriss', 'fgs']: dead_search_type = 'sigma_rate' elif self.instrument in ['miri', 'nirspec']: dead_search_type = 'absolute_rate' flat_mean_normalization_method = 'smoothed' # Call the bad pixel search module from jwst_reffiles. Lots of # other possible parameters. Only specify the non-default params # in order to make things easier to read. query_string = 'darks_{}_flats_{}_to_{}'.format(self.dark_query_start, self.flat_query_start, self.query_end) output_file = '{}_{}_{}_bpm.fits'.format(self.instrument, self.aperture, query_string) output_file = os.path.join(self.output_dir, output_file) # logging.info("Calling bad_pixel_mask.bad_pixels") # logging.info("\tflat_slope_files are: {}".format(illuminated_slope_files)) # logging.info("\tdead__search_type={}".format(dead_search_type)) # logging.info("\tflat_mean_normalization_method={}".format(flat_mean_normalization_method)) # logging.info("\tdead_flux_check_files are: {}".format(dead_flux_files)) # logging.info("\tdark_slope_files are: {}".format(dark_slope_files)) # logging.info("\tdark_uncal_files are: {}".format(dark_raw_files)) # logging.info("\tdark_jump_files are: {}".format(dark_jump_files)) # logging.info("\tdark_fitopt_files are: {}".format(dark_fitopt_files)) # logging.info("\toutput_file={}".format(output_file)) bad_pixel_mask.bad_pixels(flat_slope_files=illuminated_slope_files, dead_search_type=dead_search_type, flat_mean_normalization_method=flat_mean_normalization_method, run_dead_flux_check=True, dead_flux_check_files=dead_flux_files, flux_check=35000, dark_slope_files=dark_slope_files, dark_uncal_files=dark_raw_files, dark_jump_files=dark_jump_files, dark_fitopt_files=dark_fitopt_files, plot=False, output_file=output_file, author='jwst_reffiles', description='A bad pix mask', pedigree='GROUND', useafter='2222-04-01 00:00:00', history='This file was created by JWQL', quality_check=False) # Read in the newly-created bad pixel file set_permissions(output_file) badpix_map = fits.getdata(output_file) # Locate and read in the current bad pixel mask parameters = self.make_crds_parameter_dict() mask_dictionary = crds_tools.get_reffiles(parameters, ['mask'], download=True) baseline_file = mask_dictionary['mask'] if 'NOT FOUND' in baseline_file: logging.warning(('\tNo baseline bad pixel file for {} {}. Any bad ' 'pixels found as part of this search will be considered new'.format(self.instrument, self.aperture))) baseline_file = new_badpix_file yd, xd = badpix_mask.shape baseline_badpix_mask = np.zeros((yd, xd), type=np.int) else: logging.info('\tBaseline bad pixel file is {}'.format(baseline_file)) baseline_badpix_mask = fits.getdata(baseline_file) # Exclude hot and dead pixels in the current bad pixel mask # new_hot_pix = self.exclude_existing_badpix(new_hot_pix, 'hot') new_since_reffile = exclude_crds_mask_pix(badpix_map, baseline_badpix_mask) # Create a list of the new instances of each type of bad pixel for bad_type in badpix_types: bad_location_list = bad_map_to_list(new_since_reffile, bad_type) # Add new hot and dead pixels to the database logging.info('\tFound {} new {} pixels'.format(len(bad_location_list[0]), bad_type)) if bad_type in FLATS_BAD_PIXEL_TYPES: self.add_bad_pix(bad_location_list, bad_type, illuminated_slope_files, min_illum_time, mid_illum_time, max_illum_time, baseline_file) flat_png = create_png_from_fits(illuminated_slope_files[0], self.output_dir) elif bad_type in DARKS_BAD_PIXEL_TYPES: self.add_bad_pix(bad_location_list, bad_type, dark_slope_files, min_dark_time, mid_dark_time, max_dark_time, baseline_file) dark_png = create_png_from_fits(dark_slope_files[0], self.output_dir) else: raise ValueError("Unrecognized type of bad pixel: {}. Cannot update database table.".format(bad_type)) # Remove raw files, rate files, and pipeline products in order to save disk space files_to_remove = glob(f'{self.working_data_dir}/*.fits') for filename in files_to_remove: os.remove(filename)
@log_fail @log_info @only_one(key="bad_pixel_monitor") def run(self): """The main method. See module docstrings for further details. There are 2 parts to the bad pixel monitor: 1. Bad pixels from illuminated data 2. Bad pixels from dark data For each, we will query MAST, copy new files from the filesystem and pass the list of copied files into the ``process()`` method. """ logging.info('Begin logging for bad_pixel_monitor') # Get the output directory self.working_dir = os.path.join(get_config()['working'], 'bad_pixel_monitor') self.output_dir = os.path.join(get_config()['outputs'], 'bad_pixel_monitor') # Read in config file that defines the thresholds for the number # of dark files that must be present in order for the monitor to run limits = ascii.read(THRESHOLDS_FILE) # Use the current time as the end time for MAST query self.query_end = Time.now().mjd # Loop over all instruments updated_instruments = [] for instrument in JWST_INSTRUMENT_NAMES: self.instrument = instrument # Identify which database tables to use self.identify_tables() # Get a list of all possible apertures from pysiaf possible_apertures = self.get_possible_apertures() for aperture in possible_apertures: grating = None detector_name = None lamp = None # NIRSpec flats use the MIRROR grating. if self.instrument == 'nirspec': grating = 'MIRROR' # MIRI is unlike the other instruments. We basically treat # the detector as the aperture name because there is no # aperture name for a full frame MRS exposure. if self.instrument == 'miri': detector_name, aperture_name = aperture self.aperture = detector_name else: self.aperture = aperture aperture_name = aperture # In flight, NIRISS plans to take darks using the LINE2 lamp if self.instrument == 'niriss': lamp = 'LINE2' # What lamp is most appropriate for NIRSpec? if self.instrument == 'nirspec': lamp = 'LINE2' # What lamp is most appropriate for FGS? # if self.instrument == 'fgs': # lamp = 'G2LAMP1' logging.info('') logging.info('Working on aperture {} in {}'.format(aperture, self.instrument)) # Find the appropriate threshold for number of new files needed match = self.aperture == limits['Aperture'] flat_file_count_threshold = limits['FlatThreshold'][match].data[0] dark_file_count_threshold = limits['DarkThreshold'][match].data[0] # Locate the record of the most recent MAST search self.flat_query_start = self.most_recent_search(file_type='flat') self.dark_query_start = self.most_recent_search(file_type='dark') logging.info('\tFlat field query times: {} {}'.format(self.flat_query_start, self.query_end)) logging.info('\tDark current query times: {} {}'.format(self.dark_query_start, self.query_end)) # Query MAST using the aperture and the time of the most # recent previous search as the starting time. flat_templates = FLAT_EXP_TYPES[instrument] dark_templates = DARK_EXP_TYPES[instrument] new_flat_entries = mast_query(instrument, flat_templates, self.flat_query_start, self.query_end, aperture=aperture_name, grating=grating, detector=detector_name, lamp=lamp) new_dark_entries = mast_query(instrument, dark_templates, self.dark_query_start, self.query_end, aperture=aperture_name, detector=detector_name) # Filter the results # Filtering could be different for flats vs darks. # Kevin says we shouldn't need to worry about mixing lamps in # the data used to create the bad pixel mask. # In flight, data will only be taken with LINE2, LEVEL 5. # Currently in MAST all lamps are present, but Kevin is # not concerned about variations in flat field strucutre. # NIRISS - results can include rate, rateints, trapsfilled # MIRI - Jane says they now use illuminated data for dead pixel checks, just like other insts. # NIRSpec - can be cal, x1d, rate, rateints. Can have both cal and x1d so filter repeats # FGS - rate, rateints, trapsfilled # NIRCam - no int flats # The query results can contain multiple entries for files # in different calibration states (or for different output # products), so we need to filter the list for duplicate # entries and for the calibration state we are interested # in before we know how many new entries there really are. # In the end, we need rate files as well as uncal files # because we're going to need to create jump files. # In order to use a given file we must have at least the # uncal version of the file. Get the uncal and rate file # lists to align. if new_flat_entries: # Exclude ASIC tuning data len_new_flats = len(new_flat_entries) new_flat_entries = monitor_utils.exclude_asic_tuning(new_flat_entries) len_no_asic = len(new_flat_entries) num_asic = len_new_flats - len_no_asic logging.info("\tFiltering out ASIC tuning files removed {} flat files.".format(num_asic)) new_flat_entries = self.filter_query_results(new_flat_entries, datatype='flat') apcheck_flat_entries = pipeline_tools.aperture_size_check(new_flat_entries, instrument, aperture) lost_to_bad_metadata = len(new_flat_entries) - len(apcheck_flat_entries) logging.info('\t{} flat field files ignored due to inconsistency in array size and metadata.'.format(lost_to_bad_metadata)) flat_uncal_files = locate_uncal_files(apcheck_flat_entries) flat_uncal_files, run_flats = check_for_sufficient_files(flat_uncal_files, instrument, aperture, flat_file_count_threshold, 'flats') flat_rate_files, flat_rate_files_to_copy = locate_rate_files(flat_uncal_files) else: run_flats = False flat_uncal_files, flat_rate_files, flat_rate_files_to_copy = None, None, None if new_dark_entries: # Exclude ASIC tuning data len_new_darks = len(new_dark_entries) new_dark_entries = monitor_utils.exclude_asic_tuning(new_dark_entries) len_no_asic = len(new_dark_entries) num_asic = len_new_darks - len_no_asic logging.info("\tFiltering out ASIC tuning files removed {} dark files.".format(num_asic)) new_dark_entries = self.filter_query_results(new_dark_entries, datatype='dark') apcheck_dark_entries = pipeline_tools.aperture_size_check(new_dark_entries, instrument, aperture) lost_to_bad_metadata = len(new_dark_entries) - len(apcheck_dark_entries) logging.info('\t{} dark files ignored due to inconsistency in array size and metadata.'.format(lost_to_bad_metadata)) dark_uncal_files = locate_uncal_files(apcheck_dark_entries) dark_uncal_files, run_darks = check_for_sufficient_files(dark_uncal_files, instrument, aperture, dark_file_count_threshold, 'darks') dark_rate_files, dark_rate_files_to_copy = locate_rate_files(dark_uncal_files) else: run_darks = False dark_uncal_files, dark_rate_files, dark_rate_files_to_copy = None, None, None # Set up directories for the copied data ensure_dir_exists(os.path.join(self.working_dir, 'data')) ensure_dir_exists(os.path.join(self.output_dir, 'data')) self.working_data_dir = os.path.join(self.working_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower())) self.output_data_dir = os.path.join(self.output_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower())) ensure_dir_exists(self.working_data_dir) ensure_dir_exists(self.output_data_dir) # Copy files from filesystem if run_flats: flat_uncal_files, flat_rate_files = self.map_uncal_and_rate_file_lists(flat_uncal_files, flat_rate_files, flat_rate_files_to_copy, 'flat') if run_darks: dark_uncal_files, dark_rate_files = self.map_uncal_and_rate_file_lists(dark_uncal_files, dark_rate_files, dark_rate_files_to_copy, 'dark') # Run the bad pixel monitor if run_flats or run_darks: self.process(flat_uncal_files, flat_rate_files, flat_file_count_threshold, dark_uncal_files, dark_rate_files, dark_file_count_threshold) updated_instruments.append(self.instrument) # Update the query history if dark_uncal_files is None: num_dark_files = 0 else: num_dark_files = len(dark_uncal_files) if flat_uncal_files is None: num_flat_files = 0 else: num_flat_files = len(flat_uncal_files) new_entry = {'instrument': self.instrument.upper(), 'aperture': self.aperture, 'dark_start_time_mjd': self.dark_query_start, 'dark_end_time_mjd': self.query_end, 'flat_start_time_mjd': self.flat_query_start, 'flat_end_time_mjd': self.query_end, 'dark_files_found': num_dark_files, 'flat_files_found': num_flat_files, 'run_bpix_from_darks': run_darks, 'run_bpix_from_flats': run_flats, 'run_monitor': run_flats or run_darks, 'entry_date': datetime.datetime.now()} with engine.begin() as connection: connection.execute(self.query_table.__table__.insert(), new_entry) logging.info('\tUpdated the query history table') # Update the figures to be shown in the web app. Only update figures # for instruments where the monitor ran for instrument in updated_instruments: BadPixelPlots(instrument) logging.info(f'Updating web pages for: {updated_instruments}') logging.info('Bad Pixel Monitor completed successfully.')
if __name__ == '__main__': module = os.path.basename(__file__).strip('.py') start_time, log_file = monitor_utils.initialize_instrument_monitor(module) monitor = BadPixels() monitor.run() monitor_utils.update_monitor_table(module, start_time, log_file)