Source code for jwql.instrument_monitors.pipeline_tools

"""Various utility functions related to the JWST calibration pipeline


    - Bryan Hilbert


    This module can be imported as such:

        from jwql.instrument_monitors import pipeline_tools
        pipeline_steps = pipeline_tools.completed_pipeline_steps(filename)

from collections import OrderedDict
import copy
import numpy as np
import os

from import fits
from jwst import datamodels
from jwst.dq_init import DQInitStep
from jwst.dark_current import DarkCurrentStep
from jwst.firstframe import FirstFrameStep
from jwst.gain_scale import GainScaleStep
from jwst.group_scale import GroupScaleStep
from jwst.ipc import IPCStep
from jwst.jump import JumpStep
from jwst.lastframe import LastFrameStep
from jwst.linearity import LinearityStep
from jwst.persistence import PersistenceStep
from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
from jwst.ramp_fitting import RampFitStep
from jwst.refpix import RefPixStep
from jwst.reset import ResetStep
from jwst.rscd import RscdStep
from jwst.saturation import SaturationStep
from jwst.superbias import SuperBiasStep

from jwql.utils.constants import JWST_INSTRUMENT_NAMES_UPPERCASE
import pysiaf

# Define the fits header keyword that accompanies each step
PIPE_KEYWORDS = {'S_GRPSCL': 'group_scale', 'S_DQINIT': 'dq_init', 'S_SATURA': 'saturation',
                 'S_REFPIX': 'refpix', 'S_SUPERB': 'superbias', 'S_RESET': 'reset',
                 'S_PERSIS': 'persistence', 'S_DARK': 'dark_current', 'S_LINEAR': 'linearity',
                 'S_FRSTFR': 'firstframe', 'S_LASTFR': 'lastframe', 'S_RSCD': 'rscd',
                 'S_JUMP': 'jump', 'S_RAMP': 'rate', 'S_GANSCL': 'gain_scale', 'S_IPC': 'ipc'}

PIPELINE_STEP_MAPPING = {'dq_init': DQInitStep, 'dark_current': DarkCurrentStep,
                         'firstframe': FirstFrameStep, 'gain_scale': GainScaleStep,
                         'group_scale': GroupScaleStep, 'ipc': IPCStep, 'jump': JumpStep,
                         'lastframe': LastFrameStep, 'linearity': LinearityStep,
                         'persistence': PersistenceStep, 'rate': RampFitStep,
                         'refpix': RefPixStep, 'reset': ResetStep, 'rscd': RscdStep,
                         'saturation': SaturationStep, 'superbias': SuperBiasStep}

# Readout patterns that have nframes != a power of 2. These readout patterns
# require the group_scale pipeline step to be run.

[docs] def aperture_size_check(mast_dicts, instrument_name, aperture_name): """Check that the aperture size in a science file is consistent with what is listed in the SUBARRAY header keyword. The motivation for this check comes from NIRCam ASIC Tuning data, where file apertures are listed as FULL, but the data are in fact SUBSTRIPE256. Note that at the moment this function will only work for a subset of apertures, because the mapping of SUBARRAY header keyword value to pysiaf-recognized aperture name is not always straightforward. Initially, this function is being built only to support checking files listed as full frame. Parameters ---------- mast_dicts : list List of file metadata dictionaries, as returned from a MAST query instrument_name : str JWST instrument name aperture_name : str Name of the aperture, in order to load the proper SIAF information Returns ------- consistent_files : list List of metadata dictionaries where the array size in the metadata matches that retrieved from SIAF """ consistent_files = [] siaf = pysiaf.Siaf(instrument_name) # If the basic formula for aperture name does not produce a name recognized by # pysiaf, then skip the check and assume the file is ok. This should only be the # case for lesser-used apertures. For our purposes here, where we are focusing # on full frame apertures, it should be ok. try: siaf_ap = siaf[aperture_name] except KeyError: consistent_files.extend(mast_dicts) return consistent_files # Most cases will end up here. Compare SIAF aperture size to that in the metadata for mast_dict in mast_dicts: array_size_y, array_size_x = mast_dict['subsize2'], mast_dict['subsize1'] if ((array_size_y == siaf_ap.YSciSize) & (array_size_x == siaf_ap.XSciSize)): consistent_files.append(mast_dict) return consistent_files
[docs] def completed_pipeline_steps(filename): """Return a list of the completed pipeline steps for a given file. Parameters ---------- filename : str File to examine Returns ------- completed : collections.OrderedDict Dictionary with boolean entry for each pipeline step, indicating which pipeline steps have been run on filename """ # Initialize using PIPE_KEYWORDS so that entries are guaranteed to # be in the correct order completed = OrderedDict({}) for key in PIPE_KEYWORDS.values(): completed[key] = False header = fits.getheader(filename) for key in PIPE_KEYWORDS.keys(): try: value = header.get(key) except KeyError: value == 'NOT DONE' if value == 'COMPLETE': completed[PIPE_KEYWORDS[key]] = True return completed
[docs] def get_pipeline_steps(instrument): """Get the names and order of the ``calwebb_detector1`` pipeline steps for a given instrument. Use values that match up with the values in the ``PIPE_STEP`` defintion in ```` Parameters ---------- instrument : str Name of JWST instrument Returns ------- steps : collections.OrderedDict Dictionary of step names """ # Ensure instrument name is valid instrument = instrument.upper() if instrument not in JWST_INSTRUMENT_NAMES_UPPERCASE.values(): raise ValueError("WARNING: {} is not a valid instrument name.".format(instrument)) # Order is important in 'steps' lists below!! if instrument == 'MIRI': steps = ['group_scale', 'dq_init', 'saturation', 'ipc', 'firstframe', 'lastframe', 'reset', 'linearity', 'rscd', 'dark_current', 'refpix', 'jump', 'rate', 'gain_scale'] else: steps = ['group_scale', 'dq_init', 'saturation', 'ipc', 'superbias', 'refpix', 'linearity', 'persistence', 'dark_current', 'jump', 'rate'] # No persistence correction for NIRSpec if instrument == 'NIRSPEC': steps.remove('persistence') else: # NIRCam, NISISS, FGS all do not need group scale as nframes is # always a multiple of 2 steps.remove('group_scale') # IPC correction currently not done for any instrument steps.remove('ipc') # Initialize using OrderedDict so the steps will be in the right order required_steps = OrderedDict({}) for key in steps: required_steps[key] = True return required_steps
[docs] def image_stack(file_list): """Given a list of fits files containing 2D images, read in all data and place into a 3D stack Parameters ---------- file_list : list List of fits file names Returns ------- cube : numpy.ndarray 3D stack of the 2D images """ exptimes = [] for i, input_file in enumerate(file_list): with as hdu: image = hdu[1].data exptime = hdu[0].header['EFFINTTM'] num_ints = hdu[0].header['NINTS'] # Stack all inputs together into a single 3D image cube if i == 0: ndim_base = image.shape if len(ndim_base) == 3: cube = copy.deepcopy(image) elif len(ndim_base) == 2: cube = np.expand_dims(image, 0) else: ndim = image.shape if ndim_base[-2:] == ndim[-2:]: if len(ndim) == 2: image = np.expand_dims(image, 0) elif len(ndim) > 3: raise ValueError("4-dimensional input slope images not supported.") cube = np.vstack((cube, image)) else: raise ValueError("Input images are of inconsistent size in x/y dimension.") exptimes.append([exptime] * num_ints) return cube, exptimes
[docs] def run_calwebb_detector1_steps(input_file, steps): """Run the steps of ``calwebb_detector1`` specified in the steps dictionary on the input file Parameters ---------- input_file : str File on which to run the pipeline steps steps : collections.OrderedDict Keys are the individual pipeline steps (as seen in the ``PIPE_KEYWORDS`` values above). Boolean values indicate whether a step should be run or not. Steps are run in the official ``calwebb_detector1`` order. """ first_step_to_be_run = True for step_name in steps: if steps[step_name]: if first_step_to_be_run: model = PIPELINE_STEP_MAPPING[step_name].call(input_file) first_step_to_be_run = False else: model = PIPELINE_STEP_MAPPING[step_name].call(model) suffix = step_name output_filename = input_file.replace('.fits', '_{}.fits'.format(suffix)) if suffix != 'rate': # Make sure the dither_points metadata entry is at integer (was a string # prior to jwst v1.2.1, so some input data still have the string entry. # If we don't change that to an integer before saving the new file, the # jwst package will crash. try: model.meta.dither.dither_points = int(model.meta.dither.dither_points) except TypeError: # If the dither_points entry is not populated, then ignore this change pass else: try: model[0].meta.dither.dither_points = int(model[0].meta.dither.dither_points) except TypeError: # If the dither_points entry is not populated, then ignore this change pass model[0].save(output_filename) return output_filename
[docs] def calwebb_detector1_save_jump(input_file, output_dir, ramp_fit=True, save_fitopt=True): """Call ``calwebb_detector1`` on the provided file, running all steps up to the ``ramp_fit`` step, and save the result. Optionally run the ``ramp_fit`` step and save the resulting slope file as well. Parameters ---------- input_file : str Name of fits file to run on the pipeline output_dir : str Directory into which the pipeline outputs are saved ramp_fit : bool If ``False``, the ``ramp_fit`` step is not run. The output file will be a ``*_jump.fits`` file. If ``True``, the ``*jump.fits`` file will be produced and saved. In addition, the ``ramp_fit`` step will be run and a ``*rate.fits`` or ``*_rateints.fits`` file will be saved. (``rateints`` if the input file has >1 integration) save_fitopt : bool If ``True``, the file of optional outputs from the ramp fitting step of the pipeline is saved. Returns ------- jump_output : str Name of the saved file containing the output prior to the ``ramp_fit`` step. pipe_output : str Name of the saved file containing the output after ramp-fitting is performed (if requested). Otherwise ``None``. """ input_file_only = os.path.basename(input_file) # Find the instrument used to collect the data datamodel = datamodels.RampModel(input_file) instrument = # If the data pre-date jwst version 1.2.1, then they will have # the NUMDTHPT keyword (with string value of the number of dithers) # rather than the newer NRIMDTPT keyword (with an integer value of # the number of dithers). If so, we need to update the file here so # that it doesn't cause the pipeline to crash later. Both old and # new keywords are mapped to the model.meta.dither.dither_points # metadata entry. So we should be able to focus on that. if isinstance(datamodel.meta.dither.dither_points, str): # If we have a string, change it to an integer datamodel.meta.dither.dither_points = int(datamodel.meta.dither.dither_points) elif datamodel.meta.dither.dither_points is None: # If the information is missing completely, put in a dummy value datamodel.meta.dither.dither_points = 1 # Switch to calling the pipeline rather than individual steps, # and use the run() method so that we can set parameters # progammatically. model = Detector1Pipeline() # Always true if instrument == 'nircam': model.refpix.odd_even_rows = False # Default CR rejection threshold is too low model.jump.rejection_threshold = 15 # Turn off IPC step until it is put in the right place model.ipc.skip = True model.jump.save_results = True model.jump.output_dir = output_dir jump_output = os.path.join(output_dir, input_file_only.replace('uncal', 'jump')) # Check to see if the jump version of the requested file is already # present run_jump = not os.path.isfile(jump_output) if ramp_fit: model.ramp_fit.save_results = True # model.save_results = True model.output_dir = output_dir # pipe_output = os.path.join(output_dir, input_file_only.replace('uncal', 'rate')) pipe_output = os.path.join(output_dir, input_file_only.replace('uncal', '0_ramp_fit')) run_slope = not os.path.isfile(pipe_output) if save_fitopt: model.ramp_fit.save_opt = True fitopt_output = os.path.join(output_dir, input_file_only.replace('uncal', 'fitopt')) run_fitopt = not os.path.isfile(fitopt_output) else: model.ramp_fit.save_opt = False fitopt_output = None run_fitopt = False else: model.ramp_fit.skip = True pipe_output = None fitopt_output = None run_slope = False run_fitopt = False # Call the pipeline if any of the files at the requested calibration # states are not present in the output directory if run_jump or (ramp_fit and run_slope) or (save_fitopt and run_fitopt): else: print(("Files with all requested calibration states for {} already present in " "output directory. Skipping pipeline call.".format(input_file))) return jump_output, pipe_output, fitopt_output
[docs] def steps_to_run(all_steps, finished_steps): """Given a list of pipeline steps that need to be completed as well as a list of steps that have already been completed, return a list of steps remaining to be done. Parameters ---------- all_steps : collections.OrderedDict A dictionary of all steps that need to be completed finished_steps : collections.OrderedDict A dictionary with keys equal to the pipeline steps and boolean values indicating whether a particular step has been completed or not (i.e. output from ``completed_pipeline_steps``) Returns ------- steps_to_run : collections.OrderedDict A dictionaru with keys equal to the pipeline steps and boolean values indicating whether a particular step has yet to be run. """ torun = copy.deepcopy(finished_steps) for key in all_steps: if all_steps[key] == finished_steps[key]: torun[key] = False elif ((all_steps[key] is True) & (finished_steps[key] is False)): torun[key] = True elif ((all_steps[key] is False) & (finished_steps[key] is True)): print(("WARNING: {} step has been run " "but the requirements say that it should not " "be. Need a new input file.".format(key))) return torun