Source code for jwql.website.apps.jwql.data_containers

"""Various functions to collect data to be used by the ``views`` of the
``jwql`` app.

This module contains several functions that assist in collecting and
producing various data to be rendered in ``views.py`` for use by the
``jwql`` app.

Authors
-------

    - Lauren Chambers
    - Matthew Bourque
    - Teagan King

Use
---

    The functions within this module are intended to be imported and
    used by ``views.py``, e.g.:

    ::
        from .data_containers import get_proposal_info
"""

import copy
import glob
import os
import re
import tempfile

from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from django.conf import settings
import numpy as np
from operator import itemgetter
import pandas as pd

from jwql.database import database_interface as di
from jwql.database.database_interface import load_connection
from jwql.edb.engineering_database import get_mnemonic, get_mnemonic_info
from jwql.instrument_monitors.miri_monitors.data_trending import dashboard as miri_dash
from jwql.instrument_monitors.nirspec_monitors.data_trending import dashboard as nirspec_dash
from jwql.utils.utils import ensure_dir_exists, filesystem_path, filename_parser
from jwql.utils.constants import MONITORS
from jwql.utils.constants import INSTRUMENT_SERVICE_MATCH, JWST_INSTRUMENT_NAMES_MIXEDCASE, JWST_INSTRUMENT_NAMES_SHORTHAND
from jwql.utils.preview_image import PreviewImage
from jwql.utils.credentials import get_mast_token

# astroquery.mast import that depends on value of auth_mast
# this import has to be made before any other import of astroquery.mast
ON_GITHUB_ACTIONS = '/home/runner' in os.path.expanduser('~') or '/Users/runner' in os.path.expanduser('~')

# Determine if the code is being run as part of a Readthedocs build
ON_READTHEDOCS = False
if 'READTHEDOCS' in os.environ:
    ON_READTHEDOCS = os.environ['READTHEDOCS']

from jwql.utils.utils import get_config, filename_parser, check_config_for_key
if not ON_GITHUB_ACTIONS and not ON_READTHEDOCS:
    from .forms import MnemonicSearchForm, MnemonicQueryForm, MnemonicExplorationForm
    check_config_for_key('auth_mast')
    auth_mast = get_config()['auth_mast']
    mast_flavour = '.'.join(auth_mast.split('.')[1:])
    from astropy import config
    conf = config.get_config('astroquery')
    conf['mast'] = {'server': 'https://{}'.format(mast_flavour)}
from astroquery.mast import Mast
from jwedb.edb_interface import mnemonic_inventory

__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
if not ON_GITHUB_ACTIONS and not ON_READTHEDOCS:
    FILESYSTEM_DIR = os.path.join(get_config()['filesystem'])
    PREVIEW_IMAGE_FILESYSTEM = os.path.join(get_config()['preview_image_filesystem'])
    THUMBNAIL_FILESYSTEM = os.path.join(get_config()['thumbnail_filesystem'])
PACKAGE_DIR = os.path.dirname(__location__.split('website')[0])
REPO_DIR = os.path.split(PACKAGE_DIR)[0]


[docs]def build_table(tablename): """Create Pandas dataframe from JWQLDB table. Parameters ---------- tablename : str Name of JWQL database table name. Returns ------- table_meta_data : pandas.DataFrame Pandas data frame version of JWQL database table. """ # Make dictionary of tablename : class object # This matches what the user selects in the select element # in the webform to the python object on the backend. tables_of_interest = {} for item in di.__dict__.keys(): table = getattr(di, item) if hasattr(table, '__tablename__'): tables_of_interest[table.__tablename__] = table session, _, _, _ = load_connection(get_config()['connection_string']) table_object = tables_of_interest[tablename] # Select table object result = session.query(table_object) # Turn query result into list of dicts result_dict = [row.__dict__ for row in result.all()] column_names = table_object.__table__.columns.keys() # Build list of column data based on column name. data = [] for column in column_names: column_data = list(map(itemgetter(column), result_dict)) data.append(column_data) data = dict(zip(column_names, data)) # Build table. table_meta_data = pd.DataFrame(data) return table_meta_data
[docs]def get_acknowledgements(): """Returns a list of individuals who are acknowledged on the ``about`` page. The list is generated by reading in the contents of the ``jwql`` ``README`` file. In this way, the website will automatically update with updates to the ``README`` file. Returns ------- acknowledgements : list A list of individuals to be acknowledged. """ # Locate README file readme_file = os.path.join(REPO_DIR, 'README.md') # Get contents of the README file with open(readme_file, 'r') as f: data = f.readlines() # Find where the acknowledgements start for i, line in enumerate(data): if 'Acknowledgments' in line: index = i # Parse out the list of individuals acknowledgements = data[index + 1:] acknowledgements = [item.strip().replace('- ', '').split(' [@')[0].strip() for item in acknowledgements] return acknowledgements
[docs]def get_all_proposals(): """Return a list of all proposals that exist in the filesystem. Returns ------- proposals : list A list of proposal numbers for all proposals that exist in the filesystem """ proposals = glob.glob(os.path.join(FILESYSTEM_DIR, 'public', '*')) proposals.extend(glob.glob(os.path.join(FILESYSTEM_DIR, 'proprietary', '*'))) proposals = sorted(list(set(proposals))) proposals = [proposal.split('jw')[-1] for proposal in proposals] proposals = [proposal for proposal in proposals if len(proposal) == 5] return proposals
[docs]def get_current_flagged_anomalies(rootname, instrument): """Return a list of currently flagged anomalies for the given ``rootname`` Parameters ---------- rootname : str The rootname of interest (e.g. ``jw86600008001_02101_00001_guider2/``) Returns ------- current_anomalies : list A list of currently flagged anomalies for the given ``rootname`` (e.g. ``['snowball', 'crosstalk']``) """ table_dict = {} table_dict[instrument.lower()] = getattr(di, '{}Anomaly'.format(JWST_INSTRUMENT_NAMES_MIXEDCASE[instrument.lower()])) table = table_dict[instrument.lower()] query = di.session.query(table).filter(table.rootname == rootname).order_by(table.flag_date.desc()).limit(1) all_records = query.data_frame if not all_records.empty: current_anomalies = [col for col, val in np.sum(all_records, axis=0).items() if val] else: current_anomalies = [] return current_anomalies
[docs]def get_dashboard_components(request): """Build and return dictionaries containing components and html needed for the dashboard. Returns ------- dashboard_components : dict A dictionary containing components needed for the dashboard. dashboard_html : dict A dictionary containing full HTML needed for the dashboard. """ from jwql.website.apps.jwql.bokeh_dashboard import GeneralDashboard if 'time_delta_value' in request.POST: time_delta_value = request.POST['timedelta'] if time_delta_value == 'All Time': time_delta = None else: time_delta_options = {'All Time': None, '1 Day': pd.DateOffset(days=1), '1 Month': pd.DateOffset(months=1), '1 Week': pd.DateOffset(weeks=1), '1 Year': pd.DateOffset(years=1)} time_delta = time_delta_options[time_delta_value] dashboard = GeneralDashboard(delta_t=time_delta) return dashboard else: # When coming from home/monitor views dashboard = GeneralDashboard(delta_t=None) return dashboard
[docs]def get_edb_components(request): """Return dictionary with content needed for the EDB page. Parameters ---------- request : HttpRequest object Incoming request from the webpage Returns ------- edb_components : dict Dictionary with the required components """ mnemonic_name_search_result = {} mnemonic_query_result = {} mnemonic_query_result_plot = None mnemonic_exploration_result = None # If this is a POST request, we need to process the form data if request.method == 'POST': if 'mnemonic_name_search' in request.POST.keys(): # authenticate with astroquery.mast if necessary logged_in = log_into_mast(request) mnemonic_name_search_form = MnemonicSearchForm(request.POST, logged_in=logged_in, prefix='mnemonic_name_search') if mnemonic_name_search_form.is_valid(): mnemonic_identifier = mnemonic_name_search_form['search'].value() if mnemonic_identifier is not None: mnemonic_name_search_result = get_mnemonic_info(mnemonic_identifier) # create forms for search fields not clicked mnemonic_query_form = MnemonicQueryForm(prefix='mnemonic_query') mnemonic_exploration_form = MnemonicExplorationForm(prefix='mnemonic_exploration') elif 'mnemonic_query' in request.POST.keys(): # authenticate with astroquery.mast if necessary logged_in = log_into_mast(request) mnemonic_query_form = MnemonicQueryForm(request.POST, logged_in=logged_in, prefix='mnemonic_query') # proceed only if entries make sense if mnemonic_query_form.is_valid(): mnemonic_identifier = mnemonic_query_form['search'].value() start_time = Time(mnemonic_query_form['start_time'].value(), format='iso') end_time = Time(mnemonic_query_form['end_time'].value(), format='iso') if mnemonic_identifier is not None: mnemonic_query_result = get_mnemonic(mnemonic_identifier, start_time, end_time) mnemonic_query_result_plot = mnemonic_query_result.bokeh_plot() # generate table download in web app result_table = mnemonic_query_result.data # save file locally to be available for download static_dir = os.path.join(settings.BASE_DIR, 'static') ensure_dir_exists(static_dir) file_name_root = 'mnemonic_query_result_table' file_for_download = '{}.csv'.format(file_name_root) path_for_download = os.path.join(static_dir, file_for_download) # add meta data to saved table comments = [] comments.append('DMS EDB query of {}:'.format(mnemonic_identifier)) for key, value in mnemonic_query_result.info.items(): comments.append('{} = {}'.format(key, str(value))) result_table.meta['comments'] = comments comments.append(' ') comments.append('Start time {}'.format(start_time.isot)) comments.append('End time {}'.format(end_time.isot)) comments.append('Number of rows {}'.format(len(result_table))) comments.append(' ') result_table.write(path_for_download, format='ascii.fixed_width', overwrite=True, delimiter=',', bookend=False) mnemonic_query_result.file_for_download = file_for_download # create forms for search fields not clicked mnemonic_name_search_form = MnemonicSearchForm(prefix='mnemonic_name_search') mnemonic_exploration_form = MnemonicExplorationForm(prefix='mnemonic_exploration') elif 'mnemonic_exploration' in request.POST.keys(): mnemonic_exploration_form = MnemonicExplorationForm(request.POST, prefix='mnemonic_exploration') if mnemonic_exploration_form.is_valid(): mnemonic_exploration_result, meta = mnemonic_inventory() # loop over filled fields and implement simple AND logic for field in mnemonic_exploration_form.fields: field_value = mnemonic_exploration_form[field].value() if field_value != '': column_name = mnemonic_exploration_form[field].label # matching indices in table (case-insensitive) index = [ i for i, item in enumerate(mnemonic_exploration_result[column_name]) if re.search(field_value, item, re.IGNORECASE) ] mnemonic_exploration_result = mnemonic_exploration_result[index] mnemonic_exploration_result.n_rows = len(mnemonic_exploration_result) # generate tables for display and download in web app display_table = copy.deepcopy(mnemonic_exploration_result) # temporary html file, # see http://docs.astropy.org/en/stable/_modules/astropy/table/ tmpdir = tempfile.mkdtemp() file_name_root = 'mnemonic_exploration_result_table' path_for_html = os.path.join(tmpdir, '{}.html'.format(file_name_root)) with open(path_for_html, 'w') as tmp: display_table.write(tmp, format='jsviewer') mnemonic_exploration_result.html_file_content = open(path_for_html, 'r').read() # pass on meta data to have access to total number of mnemonics mnemonic_exploration_result.meta = meta # save file locally to be available for download static_dir = os.path.join(settings.BASE_DIR, 'static') ensure_dir_exists(static_dir) file_for_download = '{}.csv'.format(file_name_root) path_for_download = os.path.join(static_dir, file_for_download) display_table.write(path_for_download, format='ascii.fixed_width', overwrite=True, delimiter=',', bookend=False) mnemonic_exploration_result.file_for_download = file_for_download if mnemonic_exploration_result.n_rows == 0: mnemonic_exploration_result = 'empty' # create forms for search fields not clicked mnemonic_name_search_form = MnemonicSearchForm(prefix='mnemonic_name_search') mnemonic_query_form = MnemonicQueryForm(prefix='mnemonic_query') else: mnemonic_name_search_form = MnemonicSearchForm(prefix='mnemonic_name_search') mnemonic_query_form = MnemonicQueryForm(prefix='mnemonic_query') mnemonic_exploration_form = MnemonicExplorationForm(prefix='mnemonic_exploration') edb_components = {'mnemonic_query_form': mnemonic_query_form, 'mnemonic_query_result': mnemonic_query_result, 'mnemonic_query_result_plot': mnemonic_query_result_plot, 'mnemonic_name_search_form': mnemonic_name_search_form, 'mnemonic_name_search_result': mnemonic_name_search_result, 'mnemonic_exploration_form': mnemonic_exploration_form, 'mnemonic_exploration_result': mnemonic_exploration_result} return edb_components
[docs]def get_expstart(instrument, rootname): """Return the exposure start time (``expstart``) for the given ``rootname``. The ``expstart`` is gathered from a query to the ``astroquery.mast`` service. Parameters ---------- instrument : str The instrument of interest (e.g. `FGS`). rootname : str The rootname of the observation of interest (e.g. ``jw86700006001_02101_00006_guider1``). Returns ------- expstart : float The exposure start time of the observation (in MJD). """ file_set_name = '_'.join(rootname.split('_')[:-1]) service = INSTRUMENT_SERVICE_MATCH[instrument] params = { 'columns': 'filename, expstart', 'filters': [{'paramName': 'fileSetName', 'values': [file_set_name]}]} response = Mast.service_request_async(service, params) result = response[0].json() if result['data'] == []: expstart = 0 print("WARNING: no data") else: expstart = min([item['expstart'] for item in result['data']]) return expstart
[docs]def get_filenames_by_instrument(instrument, restriction='all'): """Returns a list of filenames that match the given ``instrument``. Parameters ---------- instrument : str The instrument of interest (e.g. `FGS`). restriction : str If ``all``, all filenames will be returned. If ``public``, only publicly-available filenames will be returned. If ``proprietary``, only proprietary filenames will be returned. Returns ------- filenames : list A list of files that match the given instrument. """ service = INSTRUMENT_SERVICE_MATCH[instrument] # Query for filenames params = {"columns": "filename, isRestricted", "filters": []} response = Mast.service_request_async(service, params) result = response[0].json() # Determine filenames to return based on restriction parameter if restriction == 'all': filenames = [item['filename'] for item in result['data']] elif restriction == 'public': filenames = [item['filename'] for item in result['data'] if item['isRestricted'] is False] elif restriction == 'proprietary': filenames = [item['filename'] for item in result['data'] if item['isRestricted'] is True] else: raise KeyError('{} is not a valid restriction level. Use "all", "public", or "proprietary".'.format(restriction)) return filenames
[docs]def get_filenames_by_proposal(proposal): """Return a list of filenames that are available in the filesystem for the given ``proposal``. Parameters ---------- proposal : str The one- to five-digit proposal number (e.g. ``88600``). Returns ------- filenames : list A list of filenames associated with the given ``proposal``. """ proposal_string = '{:05d}'.format(int(proposal)) filenames = glob.glob(os.path.join(FILESYSTEM_DIR, 'public', 'jw{}'.format(proposal_string), '*/*')) filenames.extend(glob.glob(os.path.join(FILESYSTEM_DIR, 'proprietary', 'jw{}'.format(proposal_string), '*/*'))) filenames = sorted([os.path.basename(filename) for filename in filenames]) return filenames
[docs]def get_filenames_by_rootname(rootname): """Return a list of filenames that are part of the given ``rootname``. Parameters ---------- rootname : str The rootname of interest (e.g. ``jw86600008001_02101_00007_guider2``). Returns ------- filenames : list A list of filenames associated with the given ``rootname``. """ proposal_dir = rootname[0:7] observation_dir = rootname.split('_')[0] filenames = glob.glob(os.path.join(FILESYSTEM_DIR, 'public', proposal_dir, observation_dir, '{}*'.format(rootname))) filenames.extend(glob.glob(os.path.join(FILESYSTEM_DIR, 'proprietary', proposal_dir, observation_dir, '{}*'.format(rootname)))) filenames = sorted([os.path.basename(filename) for filename in filenames]) return filenames
[docs]def get_header_info(filename): """Return the header information for a given ``filename``. Parameters ---------- filename : str The name of the file of interest, without the extension (e.g. ``'jw86600008001_02101_00007_guider2_uncal'``). Returns ------- header_info : dict The FITS headers of the extensions in the given ``file``. """ # Initialize dictionary to store header information header_info = {} # Open the file fits_filepath = filesystem_path(filename, search='*_rate.fits') hdulist = fits.open(fits_filepath) # Extract header information from file for ext in range(0, len(hdulist)): #for ext in range(0, 1): # Initialize dictionary to store header information for particular extension header_info[ext] = {} # Get header header = hdulist[ext].header # Determine the extension name if ext == 0: header_info[ext]['EXTNAME'] = 'PRIMARY' else: header_info[ext]['EXTNAME'] = header['EXTNAME'] # Get list of keywords and values exclude_list = ['', 'COMMENT'] header_info[ext]['keywords'] = [item for item in list(header.keys()) if item not in exclude_list] header_info[ext]['values'] = [] for key in header_info[ext]['keywords']: header_info[ext]['values'].append(hdulist[ext].header[key]) # Close the file hdulist.close() # Build tables for ext in header_info: data_dict = {} data_dict['Keyword'] = header_info[ext]['keywords'] data_dict['Value'] = header_info[ext]['values'] header_info[ext]['table'] = pd.DataFrame(data_dict) header_info[ext]['table_rows'] = header_info[ext]['table'].values header_info[ext]['table_columns'] = header_info[ext]['table'].columns.values return header_info
[docs]def get_image_info(file_root, rewrite): """Build and return a dictionary containing information for a given ``file_root``. Parameters ---------- file_root : str The rootname of the file of interest (e.g. ``jw86600008001_02101_00007_guider2``). rewrite : bool ``True`` if the corresponding JPEG needs to be rewritten, ``False`` if not. Returns ------- image_info : dict A dictionary containing various information for the given ``file_root``. """ # Initialize dictionary to store information image_info = {} image_info['all_jpegs'] = [] image_info['suffixes'] = [] image_info['num_ints'] = {} # Find all of the matching files proposal_dir = file_root[:7] observation_dir = file_root[:13] filenames = glob.glob(os.path.join(FILESYSTEM_DIR, 'public', proposal_dir, observation_dir, '{}*.fits'.format(file_root))) filenames.extend(glob.glob(os.path.join(FILESYSTEM_DIR, 'proprietary', proposal_dir, observation_dir, '{}*.fits'.format(file_root)))) image_info['all_files'] = filenames for filename in image_info['all_files']: # Get suffix information suffix = os.path.basename(filename).split('_')[4].split('.')[0] image_info['suffixes'].append(suffix) # Determine JPEG file location jpg_dir = os.path.join(get_config()['preview_image_filesystem'], proposal_dir) jpg_filename = os.path.basename(os.path.splitext(filename)[0] + '_integ0.jpg') jpg_filepath = os.path.join(jpg_dir, jpg_filename) # Check that a jpg does not already exist. If it does (and rewrite=False), # just call the existing jpg file if os.path.exists(jpg_filepath) and not rewrite: pass # If it doesn't, make it using the preview_image module else: if not os.path.exists(jpg_dir): os.makedirs(jpg_dir) im = PreviewImage(filename, 'SCI') im.output_directory = jpg_dir im.make_image() # Record how many integrations there are per filetype search_jpgs = os.path.join(get_config()['preview_image_filesystem'], observation_dir, '{}_{}_integ*.jpg'.format(file_root, suffix)) num_jpgs = len(glob.glob(search_jpgs)) image_info['num_ints'][suffix] = num_jpgs image_info['all_jpegs'].append(jpg_filepath) return image_info
[docs]def get_instrument_proposals(instrument): """Return a list of proposals for the given instrument Parameters ---------- instrument : str Name of the JWST instrument, with first letter capitalized (e.g. ``Fgs``) Returns ------- proposals : list List of proposals for the given instrument """ service = "Mast.Jwst.Filtered.{}".format(instrument) params = {"columns": "program", "filters": []} response = Mast.service_request_async(service, params) results = response[0].json()['data'] proposals = list(set(result['program'] for result in results)) return proposals
[docs]def get_jwqldb_table_view_components(request): """Renders view for JWQLDB table viewer. Parameters ---------- request : HttpRequest object Incoming request from the webpage Returns ------- table_data : pandas.DataFrame Pandas data frame of JWQL database table table_name : str Name of database table selected by user """ if 'make_table_view' in request.POST: table_name = request.POST['db_table_select'] table_data = build_table(table_name) return table_data, table_name else: # When coming from home/monitor views table_data = None table_name = None return table_data, table_name
[docs]def get_preview_images_by_instrument(inst): """Return a list of preview images available in the filesystem for the given instrument. Parameters ---------- inst : str The instrument of interest (e.g. ``NIRCam``). Returns ------- preview_images : list A list of preview images available in the filesystem for the given instrument. """ # Make sure the instrument is of the proper format (e.g. "Nircam") instrument = inst[0].upper() + inst[1:].lower() # Query MAST for all rootnames for the instrument service = "Mast.Jwst.Filtered.{}".format(instrument) params = {"columns": "filename", "filters": []} response = Mast.service_request_async(service, params) results = response[0].json()['data'] # Parse the results to get the rootnames filenames = [result['filename'].split('.')[0] for result in results] # Get list of all preview_images preview_images = glob.glob(os.path.join(PREVIEW_IMAGE_FILESYSTEM, '*', '*.jpg')) # Get subset of preview images that match the filenames preview_images = [os.path.basename(item) for item in preview_images if os.path.basename(item).split('_integ')[0] in filenames] # Return only return preview_images
[docs]def get_preview_images_by_proposal(proposal): """Return a list of preview images available in the filesystem for the given ``proposal``. Parameters ---------- proposal : str The one- to five-digit proposal number (e.g. ``88600``). Returns ------- preview_images : list A list of preview images available in the filesystem for the given ``proposal``. """ proposal_string = '{:05d}'.format(int(proposal)) preview_images = glob.glob(os.path.join(PREVIEW_IMAGE_FILESYSTEM, 'jw{}'.format(proposal_string), '*')) preview_images = [os.path.basename(preview_image) for preview_image in preview_images] return preview_images
[docs]def get_preview_images_by_rootname(rootname): """Return a list of preview images available in the filesystem for the given ``rootname``. Parameters ---------- rootname : str The rootname of interest (e.g. ``jw86600008001_02101_00007_guider2``). Returns ------- preview_images : list A list of preview images available in the filesystem for the given ``rootname``. """ proposal = rootname.split('_')[0].split('jw')[-1][0:5] preview_images = sorted(glob.glob(os.path.join( PREVIEW_IMAGE_FILESYSTEM, 'jw{}'.format(proposal), '{}*'.format(rootname)))) preview_images = [os.path.basename(preview_image) for preview_image in preview_images] return preview_images
[docs]def get_proposal_info(filepaths): """Builds and returns a dictionary containing various information about the proposal(s) that correspond to the given ``filepaths``. The information returned contains such things as the number of proposals, the paths to the corresponding thumbnails, and the total number of files. Parameters ---------- filepaths : list A list of full paths to files of interest. Returns ------- proposal_info : dict A dictionary containing various information about the proposal(s) and files corresponding to the given ``filepaths``. """ # Initialize some containers thumbnail_paths = [] num_files = [] # Gather thumbnails and counts for proposals proposals, thumbnail_paths, num_files = [], [], [] for filepath in filepaths: proposal = filepath.split('/')[-1][2:7] if proposal not in proposals: thumbnail_paths.append(os.path.join('jw{}'.format(proposal), 'jw{}.thumb'.format(proposal))) files_for_proposal = [item for item in filepaths if 'jw{}'.format(proposal) in item] num_files.append(len(files_for_proposal)) proposals.append(proposal) # Put the various information into a dictionary of results proposal_info = {} proposal_info['num_proposals'] = len(proposals) proposal_info['proposals'] = proposals proposal_info['thumbnail_paths'] = thumbnail_paths proposal_info['num_files'] = num_files return proposal_info
[docs]def get_thumbnails_all_instruments(parameters): """Return a list of thumbnails available in the filesystem for all instruments given requested MAST parameters and queried anomalies. Parameters ---------- parameters: dict A dictionary containing the following keys, some of which are dictionaries: instruments apertures filters detector effexptm_min effexptm_max anomalies Returns ------- thumbnails : list A list of thumbnails available in the filesystem for the given instrument. """ anomalies = parameters['anomalies'] filenames = [] for inst in parameters['instruments']: # Make sure instruments are of the proper format (e.g. "Nircam") instrument = inst[0].upper() + inst[1:].lower() # Query MAST for all rootnames for the instrument service = "Mast.Jwst.Filtered.{}".format(instrument) if (parameters['apertures'][inst.lower()] == []) and (parameters['detectors'][inst.lower()] == []) \ and (parameters['filters'][inst.lower()] == []) and (parameters['exposure_types'][inst.lower()] == []) \ and (parameters['read_patterns'][inst.lower()] == []): params = {"columns": "*", "filters": []} else: query_filters = [] if (parameters['apertures'][inst.lower()] != []): if instrument != "Nircam": query_filters.append({"paramName": "pps_aper", "values": parameters['apertures'][inst.lower()]}) if instrument == "Nircam": query_filters.append({"paramName": "apername", "values": parameters['apertures'][inst.lower()]}) if (parameters['detectors'][inst.lower()] != []): query_filters.append({"paramName": "detector", "values": parameters['detectors'][inst.lower()]}) if (parameters['filters'][inst.lower()] != []): query_filters.append({"paramName": "filter", "values": parameters['filters'][inst.lower()]}) if (parameters['exposure_types'][inst.lower()] != []): query_filters.append({"paramName": "exp_type", "values": parameters['exposure_types'][inst.lower()]}) if (parameters['read_patterns'][inst.lower()] != []): query_filters.append({"paramName": "readpatt", "values": parameters['read_patterns'][inst.lower()]}) params = {"columns": "*", "filters": query_filters} response = Mast.service_request_async(service, params) results = response[0].json()['data'] inst_filenames = [result['filename'].split('.')[0] for result in results] filenames.extend(inst_filenames) # Get list of all thumbnails thumbnail_list = glob.glob(os.path.join(THUMBNAIL_FILESYSTEM, '*', '*.thumb')) # Get subset of preview images that match the filenames thumbnails_subset = [os.path.basename(item) for item in thumbnail_list if os.path.basename(item).split('_integ')[0] in filenames] # Eliminate any duplicates thumbnails_subset = list(set(thumbnails_subset)) # Determine whether or not queried anomalies are flagged final_subset = [] if anomalies != {'miri': [], 'nirspec': [], 'niriss': [], 'nircam': [], 'fgs': []}: for thumbnail in thumbnails_subset: components = thumbnail.split('_') rootname = ''.join((components[0], '_', components[1], '_', components[2], '_', components[3])) try: instrument = filename_parser(thumbnail)['instrument'] thumbnail_anomalies = get_current_flagged_anomalies(rootname, instrument) if thumbnail_anomalies: for anomaly in anomalies[instrument.lower()]: if anomaly.lower() in thumbnail_anomalies: # thumbnail contains an anomaly selected in the query final_subset.append(thumbnail) except KeyError: print("Error with thumbnail: ", thumbnail) else: # if no anomalies are flagged, return all thumbnails from query final_subset = thumbnails_subset return list(set(final_subset))
[docs]def get_thumbnails_by_instrument(inst): """Return a list of thumbnails available in the filesystem for the given instrument. Parameters ---------- inst : str The instrument of interest (e.g. ``NIRCam``). Returns ------- preview_images : list A list of thumbnails available in the filesystem for the given instrument. """ # Make sure the instrument is of the proper format (e.g. "Nircam") instrument = inst[0].upper() + inst[1:].lower() # Query MAST for all rootnames for the instrument service = "Mast.Jwst.Filtered.{}".format(instrument) params = {"columns": "filename", "filters": []} response = Mast.service_request_async(service, params) results = response[0].json()['data'] # Parse the results to get the rootnames filenames = [result['filename'].split('.')[0] for result in results] # Get list of all thumbnails thumbnails = glob.glob(os.path.join(THUMBNAIL_FILESYSTEM, '*', '*.thumb')) # Get subset of preview images that match the filenames thumbnails = [os.path.basename(item) for item in thumbnails if os.path.basename(item).split('_integ')[0] in filenames] return thumbnails
[docs]def get_thumbnails_by_proposal(proposal): """Return a list of thumbnails available in the filesystem for the given ``proposal``. Parameters ---------- proposal : str The one- to five-digit proposal number (e.g. ``88600``). Returns ------- thumbnails : list A list of thumbnails available in the filesystem for the given ``proposal``. """ proposal_string = '{:05d}'.format(int(proposal)) thumbnails = glob.glob(os.path.join(THUMBNAIL_FILESYSTEM, 'jw{}'.format(proposal_string), '*')) thumbnails = [os.path.basename(thumbnail) for thumbnail in thumbnails] return thumbnails
[docs]def get_thumbnails_by_rootname(rootname): """Return a list of preview images available in the filesystem for the given ``rootname``. Parameters ---------- rootname : str The rootname of interest (e.g. ``jw86600008001_02101_00007_guider2``). Returns ------- thumbnails : list A list of preview images available in the filesystem for the given ``rootname``. """ proposal = rootname.split('_')[0].split('jw')[-1][0:5] thumbnails = sorted(glob.glob(os.path.join( THUMBNAIL_FILESYSTEM, 'jw{}'.format(proposal), '{}*'.format(rootname)))) thumbnails = [os.path.basename(thumbnail) for thumbnail in thumbnails] return thumbnails
[docs]def log_into_mast(request): """Login via astroquery.mast if user authenticated in web app. Parameters ---------- request : HttpRequest object Incoming request from the webpage """ if Mast.authenticated(): return True # get the MAST access token if present access_token = str(get_mast_token(request)) # authenticate with astroquery.mast if necessary if access_token != 'None': Mast.login(token=access_token) return Mast.authenticated() else: return False
[docs]def random_404_page(): """Randomly select one of the various 404 templates for JWQL Returns ------- random_template : str Filename of the selected template """ templates = ['404_space.html', '404_spacecat.html'] choose_page = np.random.choice(len(templates)) random_template = templates[choose_page] return random_template
[docs]def thumbnails_ajax(inst, proposal=None): """Generate a page that provides data necessary to render the ``thumbnails`` template. Parameters ---------- inst : str Name of JWST instrument proposal : str (optional) Number of APT proposal to filter Returns ------- data_dict : dict Dictionary of data needed for the ``thumbnails`` template """ # Get the available files for the instrument filenames = get_filenames_by_instrument(inst) # Get set of unique rootnames rootnames = set(['_'.join(f.split('/')[-1].split('_')[:-1]) for f in filenames]) # If the proposal is specified (i.e. if the page being loaded is # an archive page), only collect data for given proposal if proposal is not None: proposal_string = '{:05d}'.format(int(proposal)) rootnames = [rootname for rootname in rootnames if rootname[2:7] == proposal_string] # Initialize dictionary that will contain all needed data data_dict = {} data_dict['inst'] = inst data_dict['file_data'] = {} # Gather data for each rootname for rootname in rootnames: # Parse filename try: filename_dict = filename_parser(rootname) if 'detector' not in filename_dict.keys(): # this keyword is expected in thumbnails_query_ajax() for generating filterable dropdown menus filename_dict['detector'] = 'Unknown' except ValueError: # Temporary workaround for noncompliant files in filesystem filename_dict = {'activity': rootname[17:19], 'detector': rootname[26:], 'exposure_id': rootname[20:25], 'observation': rootname[7:10], 'parallel_seq_id': rootname[16], 'program_id': rootname[2:7], 'visit': rootname[10:13], 'visit_group': rootname[14:16]} # Get list of available filenames available_files = [item for item in filenames if rootname in item] # Add data to dictionary data_dict['file_data'][rootname] = {} data_dict['file_data'][rootname]['filename_dict'] = filename_dict data_dict['file_data'][rootname]['available_files'] = available_files data_dict['file_data'][rootname]['suffixes'] = [filename_parser(filename)['suffix'] for filename in available_files] try: data_dict['file_data'][rootname]['expstart'] = get_expstart(inst, rootname) data_dict['file_data'][rootname]['expstart_iso'] = Time(data_dict['file_data'][rootname]['expstart'], format='mjd').iso.split('.')[0] except: print("issue with get_expstart for {}".format(rootname)) # Extract information for sorting with dropdown menus # (Don't include the proposal as a sorting parameter if the proposal has already been specified) detectors, proposals = [], [] for rootname in list(data_dict['file_data'].keys()): proposals.append(data_dict['file_data'][rootname]['filename_dict']['program_id']) try: # Some rootnames cannot parse out detectors detectors.append(data_dict['file_data'][rootname]['filename_dict']['detector']) except KeyError: pass if proposal is not None: dropdown_menus = {'detector': detectors} else: dropdown_menus = {'detector': detectors, 'proposal': proposals} data_dict['tools'] = MONITORS data_dict['dropdown_menus'] = dropdown_menus data_dict['prop'] = proposal return data_dict
[docs]def thumbnails_query_ajax(rootnames): """Generate a page that provides data necessary to render the ``thumbnails`` template. Parameters ---------- rootnames : list of strings (optional) Rootname of APT proposal to filter Returns ------- data_dict : dict Dictionary of data needed for the ``thumbnails`` template """ # Initialize dictionary that will contain all needed data data_dict = {} # dummy variable for view_image when thumbnail is selected data_dict['inst'] = "all" data_dict['file_data'] = {} # Gather data for each rootname for rootname in rootnames: # fit expected format for get_filenames_by_rootname() try: rootname = rootname.split("_")[0] + '_' + rootname.split("_")[1] + '_' + rootname.split("_")[2] + '_' + rootname.split("_")[3] except IndexError: continue # Parse filename try: filename_dict = filename_parser(rootname) except ValueError: # Temporary workaround for noncompliant files in filesystem filename_dict = {'activity': rootname[17:19], 'detector': rootname[26:], 'exposure_id': rootname[20:25], 'observation': rootname[7:10], 'parallel_seq_id': rootname[16], 'program_id': rootname[2:7], 'visit': rootname[10:13], 'visit_group': rootname[14:16]} # Get list of available filenames available_files = get_filenames_by_rootname(rootname) # Add data to dictionary data_dict['file_data'][rootname] = {} try: data_dict['file_data'][rootname]['inst'] = JWST_INSTRUMENT_NAMES_MIXEDCASE[filename_parser(rootname)['instrument']] except KeyError: data_dict['file_data'][rootname]['inst'] = "MIRI" print("Warning: assuming instrument is MIRI") data_dict['file_data'][rootname]['filename_dict'] = filename_dict data_dict['file_data'][rootname]['available_files'] = available_files data_dict['file_data'][rootname]['expstart'] = get_expstart(data_dict['file_data'][rootname]['inst'], rootname) data_dict['file_data'][rootname]['suffixes'] = [filename_parser(filename)['suffix'] for filename in available_files] data_dict['file_data'][rootname]['prop'] = rootname[2:7] # Extract information for sorting with dropdown menus try: detectors = [data_dict['file_data'][rootname]['filename_dict']['detector'] for rootname in list(data_dict['file_data'].keys())] except KeyError: detectors = [] for rootname in list(data_dict['file_data'].keys()): try: detector = data_dict['file_data'][rootname]['filename_dict']['detector'] detectors.append(detector) if detector not in detectors else detectors except KeyError: detector = 'Unknown' detectors.append(detector) if detector not in detectors else detectors instruments = [data_dict['file_data'][rootname]['inst'].lower() for rootname in list(data_dict['file_data'].keys())] proposals = [data_dict['file_data'][rootname]['filename_dict']['program_id'] for rootname in list(data_dict['file_data'].keys())] dropdown_menus = {'instrument': instruments, 'detector': detectors, 'proposal': proposals} data_dict['tools'] = MONITORS data_dict['dropdown_menus'] = dropdown_menus return data_dict