#! /usr/bin/env python
"""Generate preview images for all files in the ``jwql`` filesystem.
Execution of this script will generate preview images and thumbnail
images for each file in the ``jwql`` filesystem. Preview images have
axes labels, titles, and colorbars, wheras thumbnail images are
smaller and contain no labels. Images are saved into the
``preview_image_filesystem`` and ``thumbnail_filesystem``, organized
by subdirectories pertaining to the ``program_id`` in the filenames.
Authors
-------
- Matthew Bourque
- Bryan Hilbert
Use
---
This script is intended to be executed as such:
::
python generate_preview_images.py
"""
import glob
import logging
import multiprocessing
import os
import re
import numpy as np
from jwql.utils import permissions
from jwql.utils.constants import NIRCAM_LONGWAVE_DETECTORS, NIRCAM_SHORTWAVE_DETECTORS
from jwql.utils.logging_functions import configure_logging, log_info, log_fail
from jwql.utils.preview_image import PreviewImage
from jwql.utils.utils import get_config, filename_parser, initialize_instrument_monitor
from jwql.utils.monitor_utils import update_monitor_table
# Size of NIRCam inter- and intra-module chip gaps
SW_MOD_GAP = 1387 # pixels = int(43 arcsec / 0.031 arcsec/pixel)
LW_MOD_GAP = 741 # pixels = int(46 arcsec / 0.062 arcsec/pixel)
SW_DET_GAP = 145 # pixels = int(4.5 arcsec / 0.031 arcsec/pixel)
FULLX = 2048 # Width of the full detector
FULLY = 2048 # Height of the full detector
[docs]def array_coordinates(channelmod, detector_list, lowerleft_list):
"""Create an appropriately sized ``numpy`` array to contain the
mosaic image given the channel and module of the data.
Parameters
----------
channelmod : str
Indicator of the NIRCam channel/module of the data.
Options are:
``LW`` - for longwave channel data
``SWA`` - for shortwave A module only (4 detectors) data
``SWB`` - for shortwave B module only (4 detectors) data
``SW`` - for shortwave both module data (8 detectors)
detector_list : list
List of detectors used in data to be simulated
lowerleft_list : list
Each element is a tuple giving the (x, y) coordinates
corresponding to the lower left corner of the aperture within
the full frame detector. These values come from the
``SUBSTRT1`` and 2 values in the file headers.
Returns
-------
xdim : int
Length of the output array needed to contain all detectors' data
ydim : int
Height of the output array needed to contain all detectors' data
module_lowerlefts : dict
Dictionary giving the ``(x, y)`` coordinate in the coordinate
system of the full module(s) where the lower left corner of the
data from a given detector will be placed. (e.g.
``NRCA1: (1888, 1888)`` means that the data from detector
``NRCA1`` should be placed into
``[1888: 1888+y_dim_of_data, 1888: 1888+x_dim_of_data]`` within
the final array (which has total dimensions of ``(xdim, ydim)``
"""
# Create dictionary of lower left pixel values for each
# detector as it sits in the MODULE. B1-B4 values here will
# need to have sw_mod_gap added to their x coordinates in
# order to translate to the full A and B module coordinate system.
# The only case where LW data will be in this function is where
# both detectors are used, so set A5/B5 coordinates to be in
# the full module A and B coordinate system. Note that these
# tuples are (x,y), NOT (y,x)
ashort = ["NRCA1", "NRCA2", "NRCA3", "NRCA4"]
bshort = ["NRCB1", "NRCB2", "NRCB3", "NRCB4"]
module_lowerlefts = {"NRCA1": (0, 0),
"NRCA2": (0, FULLY + SW_DET_GAP),
"NRCA3": (FULLX + SW_DET_GAP, 0),
"NRCA4": (FULLX + SW_DET_GAP, FULLY + SW_DET_GAP),
"NRCB1": (FULLX + SW_DET_GAP, FULLY + SW_DET_GAP),
"NRCB2": (FULLX + SW_DET_GAP, 0),
"NRCB3": (0, FULLY + SW_DET_GAP),
"NRCB4": (0, 0),
"NRCA5": (0, 0),
"NRCB5": (FULLX + LW_MOD_GAP, 0)}
# If both module A and B are used, then shift the B module
# coordinates to account for the intermodule gap
if channelmod == "SW":
mod_delta = (FULLX * 2 + SW_DET_GAP + SW_MOD_GAP, 0)
for b_detector in bshort:
module_lowerlefts[b_detector] = tuple([sum(x) for x in
zip(module_lowerlefts[b_detector],
mod_delta)])
# The only subarrays we need to worry about are the SW SUBXXX
# All other subarrays are on a single detector.
# All we need to do is find the lower left values for NRCA1 and/or NRCB4.
# From that we can calculate the rest. Also, if observing with both
# A and B modules, only full frame is allowed, so no need to worry about
# subarrays in both modules simultaneously.
subx = 1
suby = 1
if 'SW' in channelmod:
detector_list = np.array(detector_list)
a1 = detector_list == 'NRCA1'
b4 = detector_list == 'NRCB4'
lowerleft_list = np.array(lowerleft_list)
if np.sum(a1) == 1:
subx, suby = lowerleft_list[a1][0]
elif np.sum(b4) == 1:
subx, suby = lowerleft_list[b4][0]
else:
missing_a14 = "SW data provided, but neither NRCA1 nor NRCB4 are present."
logging.error(missing_a14)
raise ValueError(missing_a14)
# Adjust the lower left positions of the apertures within
# the module(s) in the case of subarrays
if ((subx != 1) | (suby != 1)):
subarr_delta = {"NRCA1": (0, 0),
"NRCA2": (0, 0 - suby),
"NRCA3": (0 - subx, 0),
"NRCA4": (0 - subx, 0 - suby),
"NRCB1": (0 - subx, 0 - suby),
"NRCB2": (0 - subx, 0),
"NRCB3": (0, 0 - suby),
"NRCB4": (0, 0)}
for det in ashort + bshort:
module_lowerlefts[det] = tuple([sum(x) for x in zip(module_lowerlefts[det],
subarr_delta[det])])
# Dimensions of array to hold all data
# Adjust dimensions of individual detector for subarray if necessary
aperturex = FULLX - (subx - 1)
aperturey = FULLY - (suby - 1)
# Full module(s) dimensions
if channelmod in ['SWA', 'SWB']:
xdim = 2 * aperturex + SW_DET_GAP
ydim = 2 * aperturey + SW_DET_GAP
elif channelmod == 'SW':
xdim = 4 * aperturex + 2 * SW_DET_GAP + SW_MOD_GAP
ydim = 2 * aperturey + SW_DET_GAP
elif channelmod == 'LW':
xdim = 2 * aperturex + LW_MOD_GAP
ydim = aperturey
return xdim, ydim, module_lowerlefts
[docs]def check_existence(file_list, outdir):
"""Given a list of fits files, determine if a preview image has
already been created in ``outdir``.
Parameters
----------
file_list : list
List of fits filenames from which preview image will be
generated
outdir : str
Directory that will contain the preview image if it exists
Returns
-------
exists : bool
``True`` if preview image exists, ``False`` if it does not
"""
# If file_list contains only a single file, then we need to search
# for a preview image name that contains the detector name
if len(file_list) == 1:
filename = os.path.split(file_list[0])[1]
search_string = filename.split('.fits')[0] + '_*.jpg'
else:
# If file_list contains multiple files, then we need to search
# for the appropriately named jpg of the mosaic, which depends
# on the specific detectors in the file_list
file_parts = filename_parser(file_list[0])
if file_parts['detector'].upper() in NIRCAM_SHORTWAVE_DETECTORS:
mosaic_str = "NRC_SW*_MOSAIC_"
elif file_parts['detector'].upper() in NIRCAM_LONGWAVE_DETECTORS:
mosaic_str = "NRC_LW*_MOSAIC_"
search_string = 'jw{}{}{}_{}{}{}_{}_{}{}*.jpg'.format(
file_parts['program_id'], file_parts['observation'],
file_parts['visit'], file_parts['visit_group'],
file_parts['parallel_seq_id'], file_parts['activity'],
file_parts['exposure_id'], mosaic_str, file_parts['suffix'])
current_files = glob.glob(os.path.join(outdir, search_string))
if len(current_files) > 0:
return True
else:
return False
[docs]def create_dummy_filename(filelist):
"""Create a dummy filename indicating the detectors used to create
the mosaic. Check the list of detectors used to determine the proper
text to substitute into the initial filename.
Parameters
----------
filelist : list
List of filenames containing the data used to create the mosaic.
It is assumed these filenames follow JWST filenaming
conventions.
Returns
-------
dummy_name : str
The first filename in ``filelist`` is modified, such that the
detector name is replaced with text indicating the source of the
mosaic data.
"""
det_string_list = []
modules = []
for filename in filelist:
indir, infile = os.path.split(filename)
det_string = filename_parser(infile)['detector']
det_string_list.append(det_string)
modules.append(det_string[3].upper())
# Previous sorting means that either all of the
# input files are LW, or all are SW. So we can check any
# file to determine LW vs SW
if '5' in det_string_list[0]:
suffix = "NRC_LW_MOSAIC"
else:
moda = modules.count('A')
modb = modules.count('B')
if moda > 0:
if modb > 0:
suffix = "NRC_SWALL_MOSAIC"
else:
suffix = "NRC_SWA_MOSAIC"
else:
if modb > 0:
suffix = "NRC_SWB_MOSAIC"
dummy_name = filelist[0].replace(det_string_list[0], suffix)
return dummy_name
[docs]def create_mosaic(filenames):
"""If an exposure comprises data from multiple detectors read in all
the appropriate files and create a mosaic so that the preview image
will show all the data together.
Parameters
----------
filenames : list
List of filenames to be combined into a mosaic
Returns
-------
mosaic_filename : str
Name of fits file containing the mosaicked data
"""
# Use preview_image to load data and create difference image
# for each detector. Save in a list
data = []
detector = []
data_lower_left = []
for filename in filenames:
image = PreviewImage(filename, "SCI") # Now have image.data, image.dq
data_dim = len(image.data.shape)
if data_dim == 4:
diff_im = image.difference_image(image.data)
else:
diff_im = image.data
data.append(diff_im)
detector.append(filename_parser(filename)['detector'].upper())
data_lower_left.append((image.xstart, image.ystart))
# Make sure SW and LW data are not being mixed. Create the
# appropriately sized numpy array to hold all the data based
# on the channel, module, and subarray size
mosaic_channel = find_data_channel(detector)
full_xdim, full_ydim, full_lower_left = array_coordinates(mosaic_channel, detector,
data_lower_left)
# Create the array to hold all the data
datashape = data[0].shape
datadim = len(datashape)
if datadim == 2:
full_array = np.zeros((1, full_ydim, full_xdim)) * np.nan
elif datadim == 3:
full_array = np.zeros((datashape[0], full_ydim, full_xdim)) * np.nan
else:
raise ValueError('Difference image for {} must be either 2D or 3D.'.format(filenames[0]))
# Place the data from the individual detectors in the appropriate
# places in the final image
for pixdata, detect in zip(data, detector):
x0, y0 = full_lower_left[detect]
if datadim == 2:
yd, xd = pixdata.shape
full_array[0, y0: y0 + yd, x0: x0 + xd] = pixdata
elif datadim == 3:
ints, yd, xd = pixdata.shape
full_array[:, y0: y0 + yd, x0: x0 + xd] = pixdata
# Create associated DQ array and set unpopulated pixels to be skipped
# in preview image scaling
full_dq = create_dq_array(full_xdim, full_ydim, full_array[0, :, :], mosaic_channel)
return full_array, full_dq
[docs]def create_dq_array(xd, yd, mosaic, module):
"""Create DQ array that goes with the mosaic image. Set unpopulated
pixels to be skipped in preview image scaling. Same for the
reference pixels for all detectors
Parameters
----------
xd : int
X-coordinate dimension of the DQ array
yd : int
Y-coordinate dimension of the DQ array
mosaic : obj
2D ``numpy`` array containing the mosaic image
module : str
Module used for mosaic. Options are ``LW``,`` SW``, ``SWA``,
``SWB``
Returns
-------
dq : obj
2D ``numpy`` array containing the DQ array. Pixels that are
``True`` are considered science pixels and are used when
scaling the preview image. Pixels that are ``False`` are
skipped.
"""
# Create array
dq = np.ones((yd, xd), dtype="bool")
# Flag inter-chip and inter-module pixels as False
dq[np.isnan(mosaic)] = 0
# Flag reference pixels as False
# Present in all cases other than subarrays
if xd >= FULLX:
dq[0:4, :] = 0
dq[:, 0:4] = 0
dq[2044:2048, :] = 0
dq[:, 2044:2048] = 0
if module == "LW":
lwb_lower = FULLX + LW_MOD_GAP
dq[:, lwb_lower:lwb_lower + 4] = 0
dq[:, lwb_lower + 2044:lwb_lower + 2048] = 0
if module in ["SWA", "SWB", "SW"]:
# Present for full frame single module or both modules
lowerval = FULLX + SW_DET_GAP
dq[lowerval: lowerval + 4, :] = 0
dq[(lowerval + 2044):, :] = 0
dq[:, lowerval: lowerval + 4] = 0
dq[:, (lowerval + 2044):(lowerval + 2048)] = 0
if module == "SW":
# Present only if both modules are used (full frame)
modb_lower = lowerval + FULLX + SW_MOD_GAP
dq[:, modb_lower:(modb_lower + 4)] = 0
dq[:, (modb_lower + 2044):(modb_lower + 2048)] = 0
modb_upper = modb_lower + FULLX + SW_DET_GAP
dq[:, modb_upper:(modb_upper + 4)] = 0
dq[:, (modb_upper + 2044):(modb_upper + 2048)] = 0
else:
# Subarrays: expand the pixels flagged due to chip gaps
# by one row and column
nan_indexes = np.where(np.isnan(mosaic))
match = nan_indexes[0] == yd - 1
vert_xmin = np.min(nan_indexes[1][match])
vert_xmax = vert_xmin + SW_DET_GAP - 1
match2 = nan_indexes[1] == xd - 1
horiz_ymin = np.min(nan_indexes[0][match2])
horiz_ymax = horiz_ymin + SW_DET_GAP - 1
dq[:, vert_xmin - 4:vert_xmin] = 0
dq[:, vert_xmax + 1:vert_xmax + 5] = 0
dq[horiz_ymin - 4:horiz_ymin, :] = 0
dq[horiz_ymax + 1:horiz_ymax + 5, :] = 0
return dq
[docs]def detector_check(detector_list, search_string):
"""Search a given list of detector names for the provided regular
expression sting.
Parameters
----------
detector_list : list
List of detector names (e.g. ``NRCA5``)
search_string : str
Regular expression string to use for search
Returns
-------
total : int
Number of detectors in ``detector_list`` that match
``search_string``
"""
pattern = re.compile(search_string, re.IGNORECASE)
match = [pattern.match(detector) for detector in detector_list]
total = np.sum(np.array([m is not None for m in match]))
return total
[docs]def find_data_channel(detectors):
"""Using a list of detectors, identify the channel(s) that the data
are from.
Parameters
----------
detectors : list
List of detector names
Returns
-------
channel : str
Identifier noting which channels the given detectors are in.
Can be ``SWA`` for shortwave, module A only, ``SWB`` for
shortwave, module B only, ``SW``, for shortwave modules A and B,
and ``LW`` for longwave.
"""
# Check the detector names for all files.
nrc_swa_total = detector_check(detectors, "NRCA[1-4]")
nrc_swb_total = detector_check(detectors, "NRCB[1-4]")
nrc_lw_total = detector_check(detectors, "NRC[AB]5")
both_channels = "Can't mix NIRCam SW and LW data in same mosaic."
if nrc_swa_total != 0:
if nrc_lw_total != 0:
raise ValueError(both_channels)
else:
if nrc_swb_total != 0:
channel = "SW"
else:
channel = "SWA"
else:
if nrc_swb_total != 0:
if nrc_lw_total != 0:
raise ValueError(both_channels)
else:
channel = "SWB"
else:
if nrc_lw_total != 0:
channel = "LW"
else:
raise ValueError("No NIRCam SW nor LW data")
return channel
[docs]def get_base_output_name(filename_dict):
"""Returns the base output name used for preview images and
thumbnails.
Parameters
----------
filename_dict : dict
A dictionary containing parsed filename parts via
``filename_parser``
Returns
-------
base_output_name : str
The base output name, e.g. ``jw96090001002_03101_00001_nrca2_rate``
"""
base_output_name = 'jw{}{}{}_{}{}{}_{}_'.format(
filename_dict['program_id'], filename_dict['observation'],
filename_dict['visit'], filename_dict['visit_group'],
filename_dict['parallel_seq_id'], filename_dict['activity'],
filename_dict['exposure_id'])
return base_output_name
[docs]@log_fail
@log_info
def generate_preview_images():
"""The main function of the ``generate_preview_image`` module.
See module docstring for further details."""
# Process programs in parallel
program_list = [os.path.basename(item) for item in glob.glob(os.path.join(get_config()['filesystem'], 'public', 'jw*'))]
program_list.extend([os.path.basename(item) for item in glob.glob(os.path.join(get_config()['filesystem'], 'proprietary', 'jw*'))])
program_list = list(set(program_list))
pool = multiprocessing.Pool(processes=int(get_config()['cores']))
pool.map(process_program, program_list)
pool.close()
pool.join()
# Complete logging:
logging.info("Completed.")
[docs]def group_filenames(filenames):
"""Given a list of JWST filenames, group together files from the
same exposure. These files will share the same ``program_id``,
``observation``, ``visit``, ``visit_group``, ``parallel_seq_id``,
``activity``, ``exposure``, and ``suffix``. Only the ``detector``
will be different. Currently only NIRCam files for a given exposure
will be grouped together. For other instruments multiple files for
a given exposure will be kept separate from one another and no
mosaic will be made. Stage 3 files will remain as individual
files, and will not be grouped together with any other files.
Parameters
----------
filenames : list
list of filenames
Returns
-------
grouped : list
grouped list of filenames where each element is a list and
contains the names of filenames with matching exposure
information.
"""
# Some initializations
grouped, matched_names = [], []
filenames.sort()
# Loop over each file in the list of good files
for filename in filenames:
# Holds list of matching files for exposure
subgroup = []
# Generate string to be matched with other filenames
try:
filename_dict = filename_parser(os.path.basename(filename))
except ValueError:
logging.warning('Could not parse filename for {}'.format(filename))
break
# If the filename was already involved in a match, then skip
if filename not in matched_names:
# For stage 3 filenames, treat individually
if 'stage_3' in filename_dict['filename_type']:
matched_names.append(filename)
subgroup.append(filename)
# Group together stage 1 and 2 filenames
elif filename_dict['filename_type'] == 'stage_1_and_2':
# Determine detector naming convention
if filename_dict['detector'].upper() in NIRCAM_SHORTWAVE_DETECTORS:
detector_str = 'NRC[AB][1234]'
elif filename_dict['detector'].upper() in NIRCAM_LONGWAVE_DETECTORS:
detector_str = 'NRC[AB]5'
else: # non-NIRCam detectors
detector_str = filename_dict['detector'].upper()
# Build pattern to match against
base_output_name = get_base_output_name(filename_dict)
match_str = '{}{}_{}.fits'.format(base_output_name, detector_str, filename_dict['suffix'])
match_str = os.path.join(os.path.dirname(filename), match_str)
pattern = re.compile(match_str, re.IGNORECASE)
# Try to match the substring to each good file
for file_to_match in filenames:
if pattern.match(file_to_match) is not None:
matched_names.append(file_to_match)
subgroup.append(file_to_match)
else:
# filename_dict['filename_type'] may be 'guider' or 'time_series', for instance. Treat individually.
matched_names.append(filename)
subgroup.append(filename)
if len(subgroup) > 0:
grouped.append(subgroup)
return grouped
[docs]def process_program(program):
"""Generate preview images and thumbnails for the given program.
Parameters
----------
program : str
The program identifier (e.g. ``88600``)
"""
logging.info('')
logging.info('Processing {}'.format(program))
# Gather files to process
filenames = glob.glob(os.path.join(get_config()['filesystem'], 'public', program, '*/*.fits'))
filenames.extend(glob.glob(os.path.join(get_config()['filesystem'], 'proprietary', program, '*/*.fits')))
filenames = list(set(filenames))
# Ignore "original" files
filenames = [item for item in filenames if '_original.fits' not in item]
# Group together common exposures
grouped_filenames = group_filenames(filenames)
logging.info('Found {} filenames'.format(len(filenames)))
logging.info('')
for file_list in grouped_filenames:
filename = file_list[0]
# Determine the save location
try:
identifier = 'jw{}'.format(filename_parser(filename)['program_id'])
except ValueError:
identifier = os.path.basename(filename).split('.fits')[0]
preview_output_directory = os.path.join(get_config()['preview_image_filesystem'], identifier)
thumbnail_output_directory = os.path.join(get_config()['thumbnail_filesystem'], identifier)
# Check to see if the preview images already exist and skip if they do
file_exists = check_existence(file_list, preview_output_directory)
if file_exists:
logging.info("\tJPG already exists for {}, skipping.".format(filename))
continue
# Create the output directories if necessary
if not os.path.exists(preview_output_directory):
os.makedirs(preview_output_directory)
permissions.set_permissions(preview_output_directory)
logging.info('\tCreated directory {}'.format(preview_output_directory))
if not os.path.exists(thumbnail_output_directory):
os.makedirs(thumbnail_output_directory)
permissions.set_permissions(thumbnail_output_directory)
logging.info('\tCreated directory {}'.format(thumbnail_output_directory))
# If the exposure contains more than one file (because more
# than one detector was used), then create a mosaic
max_size = 8
numfiles = len(file_list)
if numfiles > 1:
try:
mosaic_image, mosaic_dq = create_mosaic(file_list)
logging.info('\tCreated mosiac for:')
for item in file_list:
logging.info('\t{}'.format(item))
except (ValueError, FileNotFoundError) as error:
mosaic_image, mosaic_dq = None, None
logging.error(error)
dummy_file = create_dummy_filename(file_list)
if numfiles in [2, 4]:
max_size = 16
elif numfiles in [8]:
max_size = 32
# Create the nominal preview image and thumbnail
try:
im = PreviewImage(filename, "SCI")
im.clip_percent = 0.01
im.scaling = 'log'
im.cmap = 'viridis'
im.output_format = 'jpg'
im.preview_output_directory = preview_output_directory
im.thumbnail_output_directory = thumbnail_output_directory
# If a mosaic was made from more than one file
# insert it and it's associated DQ array into the
# instance of PreviewImage. Also set the input
# filename to indicate that we have mosaicked data
if numfiles > 1 and mosaic_image is not None:
im.data = mosaic_image
im.dq = mosaic_dq
im.file = dummy_file
im.make_image(max_img_size=max_size)
logging.info('\tCreated preview image and thumbnail for: {}'.format(filename))
except (ValueError, AttributeError) as error:
logging.warning(error)
if __name__ == '__main__':
module = os.path.basename(__file__).strip('.py')
start_time, log_file = initialize_instrument_monitor(module)
generate_preview_images()
update_monitor_table(module, start_time, log_file)