""" Definition of Super and Subclasses for the mosaic output image_list
Classes which define the total ("white light" image), filter, and exposure
drizzle products.
These products represent different levels of processing with the levels noted in the
'HAPLEVEL' keyword. The 'HAPLEVEL' values are:
* 1 : calibrated (FLT/FLC) input images and exposure level drizzle products with improved astrometry
* 2 : filter and total products combined using the improved astrometry, consistent pixel scale, and oriented to North
* 3 : unused
* 4 : multi-visit mosaics aligned to a common tangent plane
"""
import copy
import logging
import sys
import os
import traceback
import shutil
from astropy.io import fits
from collections import OrderedDict
from stsci.tools import logutil
from stwcs import updatewcs
import numpy as np
from .. import align
from .. import astrodrizzle
from .. import wcs_functions
from . import align_utils
from . import astrometric_utils as amutils
from . import cell_utils
from . import processing_utils
MSG_DATEFMT = "%Y%j%H%M%S"
SPLUNK_MSG_FORMAT = "%(asctime)s %(levelname)s src=%(name)s- %(message)s"
log = logutil.create_logger(
__name__,
level=logutil.logging.NOTSET,
stream=sys.stdout,
format=SPLUNK_MSG_FORMAT,
datefmt=MSG_DATEFMT,
)
# Define keywords to be added to SVM products to describe the overlap of
# exposures onto the output WCS footprint as determined from the SkyFootprint masks
MASK_KWS = {
"NPIXFRAC": [None, "Fraction of pixels with data"],
"MEANEXPT": [None, "Mean exposure time per pixel with data"],
"MEDEXPT": [None, "Median exposure time per pixel with data"],
"MEANNEXP": [None, "Mean number of exposures per pixel with data"],
"MEDNEXP": [None, "Median number of exposures per pixel with data"],
}
MASK_WHTKWS = {
"MEANWHT": [None, "Mean weight (typically exposure time) over pixels with data"],
"MEDWHT": [None, "Median weight (typically exposure time) over pixels with data"],
}
[docs]
class HAPProduct:
"""HAPProduct is the base class for the various products generated during the
astrometry update and mosaicing of image data.
"""
def __init__(
self, prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
):
# set logging level to user-specified level
log.setLevel(log_level)
self.log_level = log_level
# Special logic to specify the WFPC2 detector name always to PC (as opposed to WFPC2) for filenaming.
if instrument =='wfpc2':
detector = 'pc'
# Make sure the proposal ID is a 5-character string
self.prop_id = prop_id.zfill(5)
self.obset_id = obset_id
self.instrument = instrument
self.detector = detector
self.aperture_from_poller = aperture_from_poller
self.filetype = filetype
self.rules_file = None
self.basename = (
"hst_" + "_".join(map(str, [prop_id, obset_id, instrument, detector])) + "_"
)
# exposure_name is the ipppssoo or a portion thereof
self.exposure_name = filename[0:8]
# TO DO: update this variable
self.mjdutc = None
# HAPConfig objects are created after these Product objects have been instantiated so
# this attribute is updated in the hapsequncer.py module (run_hla_processing()).
self.configobj_pars = None
# Define HAPLEVEL value for this product
self.haplevel = 1
# Initialize attributes for use in generating the output products
self.meta_wcs = None
self.mask = None
self.mask_kws = copy.deepcopy(MASK_KWS)
self.mask_whtkws = copy.deepcopy(MASK_WHTKWS)
self.mask_computed = False
# attribute to use for identifying whether or not this product is valid
# can be set depending on any number of conditions, such as no member
# overlapping the WCS.
self.valid_product = True
# filenames of all reference catalogs generated for alignment attempts
self.refcat_filenames = []
# def print_info(self):
""" Generic print at this time to indicate the information used in the
construction of the object for debug purposes.
"""
# print("Object information: {}".format(self.info))
def generate_footprint_mask(self, save_mask=True):
"""Create a footprint mask for a set of exposure images
Create a mask which is True/1/on for the illuminated portion of the image, and
False/0/off for the remainder of the image.
"""
footprint = cell_utils.SkyFootprint(self.meta_wcs)
exposure_names = [element.full_filename for element in self.edp_list]
footprint.build(exposure_names, scale=True)
if footprint.bounded_wcs is None:
log.warning(
f"These exposures do not overlap this WCS:\n{exposure_names}\n{self.meta_wcs}"
)
self.valid_product = False
del footprint
return
# This mask actually represents the number of chips per pixel, not True/False.
# To have the True/False mask it should be self.mask = footprint.footprint.
# Do not fix this until it can be verified that a change will not have repercussions.
if save_mask:
self.mask = copy.deepcopy(footprint.total_mask)
# Compute footprint-based SVM-specific keywords for product image header
good_pixels = footprint.total_mask > 0
self.mask_computed = True
self.mask_kws["NPIXFRAC"][0] = good_pixels.sum() / footprint.total_mask.size
self.mask_kws["MEANEXPT"][0] = np.mean(footprint.scaled_mask[good_pixels])
self.mask_kws["MEDEXPT"][0] = np.median(footprint.scaled_mask[good_pixels])
self.mask_kws["MEANNEXP"][0] = np.mean(footprint.total_mask[good_pixels])
self.mask_kws["MEDNEXP"][0] = np.median(footprint.total_mask[good_pixels])
del footprint
def generate_metawcs(self):
"""A method to build a unique WCS for each TotalProduct product which is
generated based upon the merging of all the ExposureProducts
which comprise the specific TotalProduct. This
is done on a per TotalProduct basis as the Single
Visit Mosaics need to be represented in their native scale.
"""
exposure_filenames = [element.full_filename for element in self.edp_list]
log.debug("\n\nRun make_mosaic_wcs to create a common WCS.")
log.debug("The following images will be used: ")
log.debug("{}\n".format(exposure_filenames))
# Extract final output rot and scale from AstroDrizzle parameters to be used
final_rot = self.configobj_pars.pars["astrodrizzle"].outpars["final_rot"]
final_scale = self.configobj_pars.pars["astrodrizzle"].outpars["final_scale"]
# Set the rotation to 0.0 to force North as up
if exposure_filenames:
meta_wcs = wcs_functions.make_mosaic_wcs(
exposure_filenames, rot=final_rot, scale=final_scale
)
# Used in generation of SkyFootprints
self.meta_wcs = meta_wcs
return meta_wcs
def align_to_gaia(
self,
catalog_list=[],
output=True,
process_type="SVM",
fit_label="SVM",
align_table=None,
fitgeom="",
):
"""Extract the flt/flc filenames from the exposure product list, as
well as the corresponding headerlet filenames to use legacy alignment
routine.
"""
# Make a deep copy of the alignment parameters for safe keeping
alignment_pars = self.configobj_pars.get_pars("alignment")
orig_alignment_pars = copy.deepcopy(alignment_pars)
exposure_filenames = []
headerlet_filenames = {}
align_table = None
crclean = []
# If no catalog list has been provided, use the list defined in the configuration file
if not catalog_list:
mosaic_catalog_list = alignment_pars["run_align"]["mosaic_catalog_list"]
else:
mosaic_catalog_list = catalog_list
num_cat = len(mosaic_catalog_list)
# Fitting methods
mosaic_method_list = alignment_pars["run_align"]["mosaic_fit_list"]
if len(self.edp_list) == 1:
# Remove 'match_relative_fit' (first entry) since there is only 1 image to align
del mosaic_method_list[0]
num_method = len(mosaic_method_list)
# Fit geometry methods
# mosaic_fitgeom_list = list(alignment_pars['run_align']['mosaic_fitgeom_list'][0].items())
mosaic_fitgeom_list = list(
alignment_pars["run_align"]["mosaic_fitgeom_list"].items()
)
for edp in self.edp_list:
exposure_filenames.append(edp.full_filename)
headerlet_filenames[edp.full_filename] = edp.headerlet_filename
crclean.append(edp.crclean)
try:
# Proceed only if there is data to process
if self.edp_list:
# If necessary, generate the alignment table only once
if align_table is None:
align_table = align_utils.AlignmentTable(
exposure_filenames,
process_type=process_type,
log_level=self.log_level,
**alignment_pars,
)
align_table.find_alignment_sources(output=output, crclean=crclean)
is_good_fit = False
# determine minimum number of sources available for alignment
source_nums = []
for img_cats in align_table.extracted_sources.values():
sources = 0
for chip in img_cats:
sources += len(img_cats[chip])
source_nums.append(sources)
min_sources = min(source_nums)
# Loop for available catlogs, the first successful fit for a
# (catalog, fitting method, and fit geometry) is satisfactory to break out of the looping.
for index_cat, catalog_item in enumerate(mosaic_catalog_list):
more_catalogs = True
if (index_cat + 1) == num_cat:
more_catalogs = False
# Override the default self.refname as it really needs to be
# catalog-specific to be useful
refname = (
self.product_basename + "_" + catalog_item + "_ref_cat.ecsv"
)
log.info(
"Starting alignment to absolute astrometric reference frame '{}'.".format(
catalog_item
)
)
log.debug("Creating reference catalog {}".format(self.refname))
ref_catalog = amutils.create_astrometric_catalog(
align_table.process_list,
catalog=catalog_item,
output=self.refname,
gaia_only=False,
full_catalog=True,
)
# Add a weight column which is based upon proper motion measurements
if (
"converted" in ref_catalog.meta
and ref_catalog.meta["converted"]
and ref_catalog["RA_error"][0] != np.nan
and ref_catalog["RA_error"][0] != 0
):
ref_weight = 1 / (
ref_catalog["RA_error"] ** 2 + ref_catalog["DEC_error"] ** 2
)
ref_weight = np.nan_to_num(
ref_weight, copy=False, nan=0.0, posinf=0.0, neginf=0.0
)
else:
ref_weight = np.ones_like(ref_catalog["RA"])
ref_catalog.add_column(ref_weight, name="weight")
self.refname = (
refname # Name of reference file actually used for alignment
)
self.refcat_filenames.append(refname)
log.debug(
"Abbreviated reference catalog displayed below\n{}".format(
ref_catalog
)
)
align_table.reference_catalogs[self.refname] = ref_catalog
# Will need to satisfy the minimum criterion of found sources for the proposed
# fit geometry, or the fit geometry must be downgraded
num_ref_sources = len(ref_catalog)
log.info(
"Number of sources for reference catalog '{}' is {}.".format(
catalog_item, num_ref_sources
)
)
# Loop for specified fit methods
for index_method, full_method_name in enumerate(mosaic_method_list):
method_name = full_method_name.split("_")[1]
# If fitgeom is not provided via a parameter, get it from the configuration.
# This is the requested (from parameter) or default (from configuration)
# fit geometry, but it may be downgraded depending upon the number of
# sources available.
if not fitgeom.strip():
mosaic_fitgeom = alignment_pars[full_method_name]["fitgeom"]
else:
mosaic_fitgeom = fitgeom
# Get the index of the fitgeom in the list of all possible fitgeoms
for index, mode in enumerate(mosaic_fitgeom_list):
if mode[0].lower().strip() == mosaic_fitgeom:
mosaic_fitgeom_index = index
break
# If there are not enough references sources for the specified fitgeom,
# downgrade the fitgeom until a valid one is found. Also, if the fit done
# with the fitgeom was unsatisfactory, downgrade if possible and try again
while (
num_ref_sources
< mosaic_fitgeom_list[mosaic_fitgeom_index][1]
):
log.warning(
"Not enough reference sources for alignment using catalog '{}' with fit method '{}' and fit geometry '{}'.".format(
catalog_item,
method_name,
mosaic_fitgeom_list[mosaic_fitgeom_index][0],
)
)
mosaic_fitgeom_index -= 1
if mosaic_fitgeom_index < 0:
log.warning(
"No further fit geometries to try. Proceeding to try another fit method."
)
break
while mosaic_fitgeom_index > -1:
# In case of a downgrade, make sure the fit geometry "name" is based upon the index
mosaic_fitgeom = mosaic_fitgeom_list[mosaic_fitgeom_index][0]
log.info(
"Proceeding to use the '{}' fit geometry.".format(
mosaic_fitgeom
)
)
try:
# Always perform the configuration to ensure there is no residual cruft
# from any possible previous fit
align_table.configure_fit()
log.info(
"Trying '{}' wth method '{}' using fit geometry '{}'.".format(
catalog_item, method_name, mosaic_fitgeom
)
)
align_table.imglist = align_table.perform_fit(
method_name,
catalog_item,
ref_catalog,
fitgeom=mosaic_fitgeom,
)
# Define comparison for min cross-matches based on fitgeom used
alignment_pars["determine_fit_quality"]["min_xmatches"] = \
alignment_pars["run_align"]["mosaic_fitgeom_list"][mosaic_fitgeom]
# turn off consistency check for SVM and MVM processing since filter-to-filter
# or visit-to-visit offsets could be large relative to measurement RMS.
alignment_pars["determine_fit_quality"]["do_consistency_check"] = False
# Evaluate the quality of the fit
# Need to create this log file specifically for the PyTest testing environment
# when a single Python session is running and the tests are looking for dataset-specific
# log files.
log_file = "temp_align.log"
(is_good_fit,_,_,_,_,_,) = \
align.determine_fit_quality_mvm_interface(
align_table.imglist,
align_table.filtered_table,
more_catalogs,
num_cat,
alignment_pars,
print_fit_parameters=True,
loglevel=self.log_level,
runfile=log_file,
)
# Clean up the temporary log file as the contents are captured
try:
os.remove(log_file)
except OSError as error:
log.warning(
"Unable to remove file {}.".format(log_file)
)
log.warning(
"Output trailer files may contain duplicate information."
)
# Ensure the original parameters stay intact for the iterations
# as the perform_fit() modifies the fitgeom
alignment_pars = copy.deepcopy(orig_alignment_pars)
# The fit was good...
if is_good_fit:
log.info(
"Fit done for catalog '{}' with method '{}' using fit geometry '{}' was satisfactory.".format(
catalog_item, method_name, mosaic_fitgeom
)
)
# Success - Break out of "fitgeom" while loop
break
# The fit was not good
else:
log.info(
"Fit done for catalog '{}' with method '{}' using fit geometry '{}' was NOT satisfactory.".format(
catalog_item, method_name, mosaic_fitgeom
)
)
except Exception:
log.info(
"Problem with fit done for catalog '{}' with method '{}' using fit geometry '{}'.".format(
catalog_item, method_name, mosaic_fitgeom
)
)
exc_type, exc_value, exc_tb = sys.exc_info()
traceback.print_exception(
exc_type, exc_value, exc_tb, file=sys.stdout
)
logging.exception("message")
# Try again with a different fit geometry algorithm
mosaic_fitgeom_index -= 1
log.info("Resetting mosaic index for next fit method")
# Only if there are too few sources to perform an alignment with the current
# method should the next method be attempted. Otherwise, it is assumed that the
# image sources are either cosmic-rays or features that do not match across
# the filters (e.g., F300X vs F475W from 'id7r03') and can't be aligned. Trying with a
# less strict method (rshift vs rscale) will only increase the probability of
# matching incorrectly and introducing an error to the relative alignment.
if (
min_sources
> mosaic_fitgeom_list[mosaic_fitgeom_index][1]
):
mosaic_fitgeom_index = -1
log.warning(
"Too many (bad?) sources to try any more fitting with this catalog."
)
# Break out of the "fit method" for loop
if is_good_fit:
break
else:
if index_method + 1 < num_method:
log.warning("Proceeding to try the next fit method.")
# Break out of the "catalog" for loop
if is_good_fit:
break
else:
# log.warning("Not enough reference sources for absolute alignment using catalog {}.".format(catalog_item))
if index_cat + 1 < num_cat:
log.info("Proceeding to try the next catalog.")
else:
log.warning(
"Not enough reference sources for absolute alignment in any available catalog."
)
break
# Got a good fit...
if is_good_fit:
align_table.select_fit(catalog_item, method_name)
align_table.apply_fit(
headerlet_filenames=headerlet_filenames, fit_label=fit_label
)
else:
log.warning("No satisfactory fit found for any catalog.")
raise ValueError
except Exception:
# Report a problem with the alignment
if fit_label.upper().strip() == "SVM":
log.warning(
"EXCEPTION encountered in align_to_gaia for the FilteredProduct.\n"
)
else:
log.warning(
"EXCEPTION encountered in align_to_gaia for the SkyCellProduct.\n"
)
log.warning("No correction to absolute astrometric frame applied.\n")
log.warning("Proceeding with previous best solution.\n")
# Only write out the traceback if in "debug" mode since not being able to
# align the data to an absolute astrometric frame is not actually a failure.
log.debug(traceback.format_exc())
align_table = None
# It is necessary to clean-up reference catalogs
# created for alignment of each filter product here.
if self.refname and os.path.exists(self.refname):
self.refname = None
# Return a table which contains data regarding the alignment, as well as the
# list of the flt/flc exposures which were part of the alignment process
# TODO: This does not account for individual exposures which might have been
# excluded from alignment.
return align_table, exposure_filenames
[docs]
class TotalProduct(HAPProduct):
"""A Total Detection Product is a 'white' light mosaic comprised of
images acquired with one instrument, one detector, all filters, and all
exposure times. The exposure time characteristic may change - TBD.
The "tdp" is short hand for TotalProduct.
"""
def __init__(
self, prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
)
self.info = "_".join(
[prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype]
)
self.exposure_name = filename[0:6]
self.product_basename = self.basename + "total_" + self.exposure_name
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
self.point_cat_filename = self.product_basename + "_point-cat.ecsv"
self.segment_cat_filename = self.product_basename + "_segment-cat.ecsv"
self.drizzle_filename = self.product_basename + "_" + self.filetype + ".fits"
self.ref_drizzle_filename = (
self.product_basename + "_ref_" + self.filetype + ".fits"
)
# Generate the name for the manifest file which is for the entire visit. It is fine
# to create it as an attribute of a TotalProduct as it is independent of
# the detector in use.
# instrument_programID_obsetID_manifest.txt (e.g.,wfc3_b46_06_manifest.txt)
self.manifest_name = "_".join(
[instrument, filename[1:4], obset_id, "manifest.txt"]
)
# Define HAPLEVEL value for this product
self.haplevel = 2
# These attributes will be populated during processing
self.edp_list = []
self.fdp_list = []
self.regions_dict = {}
self.grism_edp_list = []
self.bkg_used = ""
log.debug(
"Total detection object {}/{} created.".format(
self.instrument, self.detector
)
)
def update_drizpars(self):
"""Update ALL products with final name of trailer file"""
# Update this total product
if os.path.exists(self.drizzle_filename):
fits.setval(self.drizzle_filename, "DRIZPARS", value=self.trl_filename)
else:
log.warning(
"File {} not found. DRIZPARS header value update failed.".format(
self.drizzle_filename
)
)
# Update all filter drizzle products that were actually written out
for fdp in self.fdp_list:
if os.path.exists(fdp.drizzle_filename):
fits.setval(fdp.drizzle_filename, "DRIZPARS", value=fdp.trl_filename)
# Update all exposure drizzle products that were actually written out
for edp in self.edp_list:
if os.path.exists(edp.drizzle_filename):
fits.setval(edp.drizzle_filename, "DRIZPARS", value=edp.trl_filename)
def find_member(self, name):
"""Return member instance with filename 'name'"""
desired_member = None
for member in [self] + self.edp_list + self.fdp_list:
if name == member.drizzle_filename:
desired_member = member
break
return desired_member
def add_member(self, edp):
"""Add an ExposureProduct object to the list - composition."""
self.edp_list.append(edp)
def add_grism_member(self, edp):
"""Add an GrismExposureProduct object to the list - composition.
This routine adds Grism or Prism exposures to the exposure list.
"""
self.grism_edp_list.append(edp)
def add_product(self, fdp):
"""Add a FilterProduct object to the list - composition."""
self.fdp_list.append(fdp)
def wcs_drizzle_product(self, meta_wcs):
"""
Create the drizzle-combined total image using the meta_wcs as the reference output
.. note:: Cosmic-ray identification is NOT performed when creating the total detection image.
"""
# This insures that keywords related to the footprint are generated for this
# specific object to use in updating the output drizzle product.
self.meta_wcs = meta_wcs
if self.mask_computed is False:
self.generate_footprint_mask()
# Retrieve the configuration parameters for astrodrizzle
drizzle_pars = self.configobj_pars.get_pars("astrodrizzle")
# ...and set parameters which are computed on-the-fly
drizzle_pars["final_refimage"] = meta_wcs
drizzle_pars["runfile"] = self.trl_logname
# Setting "preserve" to false so the OrIg_files directory is deleted as the purpose
# of this directory is now obsolete.
drizzle_pars["preserve"] = False
drizzle_pars["rules_file"] = self.rules_file
drizzle_pars["resetbits"] = "0"
log.debug(
"The 'final_refimage' ({}) and 'runfile' ({}) configuration variables "
"have been updated for the drizzle step of the total drizzle product.".format(
meta_wcs, self.trl_logname
)
)
# Determine if there are any single exposure filter images which
# should NOT be used in the computation of the total detection image in
# order to minimize cosmic ray contamination
#
# Loop over the exposure objects to collect the exposures for each filter
count_dict = {}
for expo in self.edp_list:
count_dict.setdefault(expo.filters, []).append(expo.full_filename)
# Accumulate the exposure file names for only the filters which have
# more than one exposure
exclude_string = []
edp_filenames = []
for key, value in count_dict.items():
if len(value) > 1:
edp_filenames += value
else:
exclude_string.append("Excluding single exposure filter image {} for {}.".format(value, key))
# However, if all the filters only have one exposure, just use all
# the exposures
if not edp_filenames:
for value in count_dict.values():
edp_filenames += value
log.info("All filters are single exposures, so all exposures are included.")
else:
for entry in exclude_string:
log.info(entry)
astrodrizzle.AstroDrizzle(
input=edp_filenames, output=self.drizzle_filename, **drizzle_pars
)
# Update product with SVM-specific keywords based on the footprint
with fits.open(self.drizzle_filename, mode="update") as hdu:
for kw in self.mask_kws:
hdu[0].header[kw] = tuple(self.mask_kws[kw])
# Now that the drizzle product is created, compute the mean and median
# of the WHT image
wht = hdu["WHT", 1].data
wht_mask = self.mask > 0 if hasattr(self.mask, "shape") else wht > 0.0
self.mask_whtkws["MEANWHT"][0] = np.mean(wht[wht_mask])
self.mask_whtkws["MEDWHT"][0] = np.median(wht[wht_mask])
for kw in self.mask_whtkws:
hdu[0].header[kw] = tuple(self.mask_whtkws[kw])
# Rename Astrodrizzle log file as a trailer file
log.debug(
"Total combined image {} composed of: {}".format(
self.drizzle_filename, edp_filenames
)
)
try:
shutil.move(self.trl_logname, self.trl_filename)
except PermissionError:
pass
[docs]
class FilterProduct(HAPProduct):
"""A Filter Detection Product is a mosaic comprised of images acquired
during a single visit with one instrument, one detector, a single filter,
and all exposure times. The exposure time characteristic may change - TBD.
The "fdp" is short hand for FilterProduct.
"""
def __init__(
self,
prop_id,
obset_id,
instrument,
detector,
aperture_from_poller,
filename,
filters,
filetype,
log_level,
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
)
self.info = "_".join(
[prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filters, filetype]
)
if filename[0:7].lower() != "metawcs":
self.exposure_name = filename[0:6]
self.product_basename = self.basename + "_".join(
map(str, [filters, self.exposure_name])
)
else:
self.exposure_name = "metawcs"
self.product_basename = self.basename + "_".join(
map(str, [filetype, self.exposure_name, filters])
)
self.filters = filters
# Trailer names .txt or .log
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
self.point_cat_filename = self.product_basename + "_point-cat.ecsv"
self.segment_cat_filename = self.product_basename + "_segment-cat.ecsv"
self.drizzle_filename = self.product_basename + "_" + self.filetype + ".fits"
# This attribute is reset in align_to_gaia to distinguish different catalogs
# which may be created
self.refname = self.product_basename + "_ref_cat.ecsv"
# Define HAPLEVEL value for this product
self.haplevel = 2
# These attributes will be populated during processing
self.edp_list = []
self.regions_dict = {}
log.info(
"Filter object {}/{}/{} created.".format(
self.instrument, self.detector, self.filters
)
)
def find_member(self, name):
"""Return member instance with filename 'name'"""
desired_member = None
for member in [self] + self.edp_list:
if name == member.drizzle_filename:
desired_member = member
break
return desired_member
def add_member(self, edp):
"""Add an ExposureProduct object to the list - composition."""
self.edp_list.append(edp)
def wcs_drizzle_product(self, meta_wcs):
"""
Create the drizzle-combined filter image using the meta_wcs as the reference output
"""
# This insures that keywords related to the footprint are generated for this
# specific object to use in updating the output drizzle product.
self.meta_wcs = meta_wcs
if self.mask_computed is False:
self.generate_footprint_mask()
# Retrieve the configuration parameters for astrodrizzle
drizzle_pars = self.configobj_pars.get_pars("astrodrizzle")
# ...and set parameters which are computed on-the-fly
# start by pulling out the specific parameters for the final refimage as
# separate input parameters
# rot and scale are specified in the config pars json files
drizzle_pars["final_ra"] = meta_wcs.wcs.crval[0]
drizzle_pars["final_dec"] = meta_wcs.wcs.crval[1]
drizzle_pars["final_crpix1"] = meta_wcs.wcs.crpix[0]
drizzle_pars["final_crpix2"] = meta_wcs.wcs.crpix[1]
drizzle_pars["final_outnx"] = meta_wcs.pixel_shape[0]
drizzle_pars["final_outny"] = meta_wcs.pixel_shape[1]
drizzle_pars["runfile"] = self.trl_logname
# Setting "preserve" to false so the OrIg_files directory is deleted as the purpose
# of this directory is now obsolete.
drizzle_pars["preserve"] = False
drizzle_pars["rules_file"] = self.rules_file
log.debug(
"The 'final_refimage' ({}) and 'runfile' ({}) configuration variables "
"have been updated for the drizzle step of the filter drizzle product.".format(
meta_wcs, self.trl_logname
)
)
edp_filenames = [element.full_filename for element in self.edp_list]
if len(edp_filenames) == 1:
drizzle_pars["resetbits"] = "0" # Use any pixels already flagged as CRs
astrodrizzle.AstroDrizzle(
input=edp_filenames, output=self.drizzle_filename, **drizzle_pars
)
# Update product with SVM-specific keywords based on the footprint
with fits.open(self.drizzle_filename, mode="update") as hdu:
for kw in self.mask_kws:
hdu[0].header[kw] = tuple(self.mask_kws[kw])
# Now that the drizzle product is created, compute the mean and median
# of the WHT image
wht = hdu["WHT", 1].data
wht_mask = self.mask > 0 if hasattr(self.mask, "shape") else wht > 0.0
self.mask_whtkws["MEANWHT"][0] = np.mean(wht[wht_mask])
self.mask_whtkws["MEDWHT"][0] = np.median(wht[wht_mask])
for kw in self.mask_whtkws:
hdu[0].header[kw] = tuple(self.mask_whtkws[kw])
# Rename Astrodrizzle log file as a trailer file
log.debug(
"Filter combined image {} composed of: {}".format(
self.drizzle_filename, edp_filenames
)
)
try:
shutil.move(self.trl_logname, self.trl_filename)
except PermissionError:
pass
[docs]
class ExposureProduct(HAPProduct):
"""An Exposure Product is an individual exposure/image (flt/flc).
The "edp" is short hand for ExposureProduct.
"""
def __init__(
self,
prop_id,
obset_id,
instrument,
detector,
aperture_from_poller,
filename,
filters,
filetype,
log_level,
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
)
self.info = "_".join(
[prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filters, filetype]
)
self.filters = filters
self.full_filename = self.copy_exposure(filename)
# Open the input FITS file to mine some header information.
hdu_list = fits.open(filename)
self.mjdutc = hdu_list[0].header["EXPSTART"]
self.exptime = hdu_list[0].header["EXPTIME"]
hdu_list.close()
self.product_basename = self.basename + "_".join(
map(str, [filters, self.exposure_name])
)
self.drizzle_filename = self.product_basename + "_" + self.filetype + ".fits"
self.headerlet_filename = self.product_basename + "_hlet.fits"
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
self.regions_dict = {}
# Define HAPLEVEL value for this product
self.haplevel = 1
# This attribute is set in poller_utils.py
self.is_singleton = False
# Flag whether this exposure is being processed for the 'first' time or not
self.new_process = True
# Flag whether to use single-image CR identification with this exposure
self.crclean = False
log.info("Exposure object {} created.".format(self.full_filename[0:9]))
def find_member(self, name):
"""Return member instance with filename 'name'"""
if name == self.drizzle_filename:
return self
else:
return None
def __getattribute__(self, name):
if name in [
"generate_footprint_mask",
"generate_metawcs",
"meta_wcs",
"mask_kws",
"mask",
]:
raise AttributeError(name)
else:
return super(ExposureProduct, self).__getattribute__(name)
def __dir__(self):
class_set = set(dir(self.__class__)) | set(self.__dict__.keys())
unwanted_set = set(
[
"generate_footprint_mask",
"generate_metawcs",
"meta_wcs",
"mask_kws",
"mask",
]
)
return sorted(class_set - unwanted_set)
def wcs_drizzle_product(self, meta_wcs):
"""
Create the drizzle-combined exposure image using the meta_wcs as the reference output
"""
# Retrieve the configuration parameters for astrodrizzle
drizzle_pars = self.configobj_pars.get_pars("astrodrizzle")
# ...and set parameters which are computed on-the-fly
drizzle_pars["final_refimage"] = meta_wcs
drizzle_pars["runfile"] = self.trl_logname
# Setting "preserve" to false so the OrIg_files directory is deleted as the purpose
# of this directory is now obsolete.
drizzle_pars["preserve"] = False
drizzle_pars["rules_file"] = self.rules_file
drizzle_pars["resetbits"] = "0"
log.debug(
"The 'final_refimage' ({}) and 'runfile' ({}) configuration variables "
"have been updated for the drizzle step of the exposure drizzle product.".format(
meta_wcs, self.trl_logname
)
)
astrodrizzle.AstroDrizzle(
input=self.full_filename, output=self.drizzle_filename, **drizzle_pars
)
# Rename Astrodrizzle log file as a trailer file
log.debug("Exposure image {}".format(self.drizzle_filename))
try:
shutil.move(self.trl_logname, self.trl_filename)
except PermissionError:
pass
def copy_exposure(self, filename):
"""
Create a copy of the original input to be renamed and used for single-visit processing.
New exposure filename needs to follow the convention:
hst_<propid>_<obsetid>_<instr>_<detector>_<filter>_<ipppssoo>_fl[ct].fits
Parameters
----------
filename : str
Original pipeline filename for input exposure
Returns
-------
edp_filename : str
New SVM-compatible HAP filename for input exposure
"""
suffix = filename.split("_")[1]
edp_filename = self.basename + "_".join(
map(str, [self.filters, filename[:8], suffix])
)
log.info("Copying {} to SVM input: \n {}".format(filename, edp_filename))
try:
shutil.copy(filename, edp_filename)
except PermissionError:
pass
# Add HAP keywords as required by pipeline processing
with fits.open(edp_filename, mode="update") as edp_hdu:
edp_hdu[0].header["HAPLEVEL"] = (1, "Classification level of this product")
edp_hdu[0].header["IPPPSSOO"] = edp_hdu[0].header["ROOTNAME"]
edp_hdu[0].header["FILENAME"] = edp_filename
# Ensure the S_REGION keyword is present in the SCI extension(s) and each
# polygon (POLYGON ICRS) in the S_REGION string is closed. As an example,
# S_REGION can be missing for images on the "do not reprocess" list.
processing_utils.compute_sregion(edp_hdu)
return edp_filename
class GrismExposureProduct(HAPProduct):
"""A Grism Exposure Product is an individual Grism/Prism exposure/image (flt/flc).
The "grism_edp" is short hand for GrismExposureProduct.
"""
def __init__(
self,
prop_id,
obset_id,
instrument,
detector,
aperture_from_poller,
filename,
filters,
filetype,
log_level,
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
)
self.info = "_".join(
[prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filters, filetype]
)
self.filters = filters
self.full_filename = self.copy_exposure(filename)
# Open the input FITS file to mine some header information.
# and make sure the WCS is up-to-date
hdu_list = fits.open(filename)
self.mjdutc = hdu_list[0].header["EXPSTART"]
self.exptime = hdu_list[0].header["EXPTIME"]
drizcorr = hdu_list[0].header["DRIZCORR"]
if drizcorr == "OMIT":
updatewcs.updatewcs(self.full_filename, use_db=True)
hdu_list.close()
self.product_basename = self.basename + "_".join(
map(str, [filters, self.exposure_name])
)
self.drizzle_filename = self.product_basename + "_" + self.filetype + ".fits"
self.headerlet_filename = self.product_basename + "_hlet.fits"
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
self.regions_dict = {}
# Define HAPLEVEL value for this product
self.haplevel = 1
log.info("Grism Exposure object {} created.".format(self.full_filename))
def copy_exposure(self, filename):
"""
Create a copy of the original input to be renamed and used for single-visit processing.
New exposure filename needs to follow the convention:
hst_<propid>_<obsetid>_<instr>_<detector>_<filter>_<ipppssoo>_fl[ct].fits
Parameters
----------
filename : str
Original pipeline filename for input exposure
Returns
-------
edp_filename : str
New SVM-compatible HAP filename for input exposure
"""
suffix = filename.split("_")[1]
edp_filename = self.basename + "_".join(
map(str, [self.filters, filename[:8], suffix])
)
log.info("Copying {} to SVM input: \n {}".format(filename, edp_filename))
try:
shutil.copy(filename, edp_filename)
except PermissionError:
pass
# Add HAP keywords as required by pipeline processing
with fits.open(edp_filename, mode="update") as edp_hdu:
edp_hdu[0].header["HAPLEVEL"] = (1, "Classification level of this product")
edp_hdu[0].header["IPPPSSOO"] = edp_hdu[0].header["ROOTNAME"]
edp_hdu[0].header["FILENAME"] = edp_filename
# Ensure the S_REGION keyword is present in the SCI extension(s) and each
# polygon (POLYGON ICRS) in the S_REGION string is closed. As an example,
# S_REGION can be missing for images on the "do not reprocess" list.
processing_utils.compute_sregion(edp_hdu)
return edp_filename
[docs]
class SkyCellExposure(HAPProduct):
"""An SkyCell Exposure Product is an individual exposure/image (flt/flc).
The "sce" is short hand for ExposureProduct.
"""
def __init__(
self,
prop_id,
obset_id,
instrument,
detector,
aperture_from_poller,
filename,
layer,
filetype,
log_level,
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, filename, filetype, log_level
)
filter_str = layer[0]
# parse layer information into filename layer_str
# layer: [filter_str, pscale_str, exptime_str, epoch_str]
# e.g.: [f160w, coarse, all, all]
#
if filename.startswith("hst"):
exposure_name = filename.split("_")[-2]
else:
exposure_name = self.exposure_name
if layer[1] == "coarse":
layer_vals = [layer[1], layer[2], exposure_name]
else:
layer_vals = ["all", exposure_name]
layer_str = "-".join(layer_vals)
cell_id = "p{}{}".format(prop_id, obset_id)
self.basename = (
"hst_skycell-" + "_".join(map(str, [cell_id, instrument, detector])) + "_"
)
self.info = self.basename + "_".join(
[filter_str, layer_str, filename, filetype]
)
self.filters = filter_str
# Open the input FITS file to mine some header information.
hdu_list = fits.open(filename)
self.mjdutc = hdu_list[0].header["EXPSTART"]
self.exptime = hdu_list[0].header["EXPTIME"]
hdu_list.close()
self.product_basename = self.basename + "_".join(
map(str, [filter_str, layer_str])
)
self.drizzle_filename = self.product_basename + "_" + self.filetype + ".fits"
self.headerlet_filename = self.product_basename + "_hlet.fits"
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
self.full_filename = self.copy_exposure(filename)
self.regions_dict = {}
# Define HAPLEVEL value for this product
self.haplevel = 1
# This attribute is set in poller_utils.py
self.is_singleton = False
# Flag whether this exposure is being processed for the 'first' time or not
self.new_process = True
# Flag whether to use single-image CR identification with this exposure
self.crclean = False
# Flag to indicate whether input exposure was modified (typically, with new WCS)
self.input_updated = False
log.info("Create SkyCellExposure object:\n {}".format(self.full_filename))
def find_member(self, name):
"""Return member instance with filename 'name'"""
if name == self.drizzle_filename:
return self
else:
return None
def verify_member(self, clean=True):
"""Delete member from disk if it has not been modified during processing
This method not only deletes the file from disk, but also reports the full
filename of that file to allow the calling routine to keep track of what
was deleted.
Parameter
---------
clean : bool
Specify whether or not to remove file from disk. If False,
leave file behind, usually for debugging purposes when
``diagnostic_mode`` is True.
Returns
--------
full_filename : str
Full filename of file that was deleted, IF it was not modified.
This value will be an empty string if the file was modified.
"""
del_file = ""
if not self.input_updated:
if clean and os.path.exists(self.full_filename):
os.remove(self.full_filename)
del_file = self.full_filename
return del_file
def __getattribute__(self, name):
if name in [
"generate_footprint_mask",
"generate_metawcs",
"meta_wcs",
"mask_kws",
"mask",
]:
raise AttributeError(name)
else:
return super(SkyCellExposure, self).__getattribute__(name)
def __dir__(self):
class_set = set(dir(self.__class__)) | set(self.__dict__.keys())
unwanted_set = set(
[
"generate_footprint_mask",
"generate_metawcs",
"meta_wcs",
"mask_kws",
"mask",
]
)
return sorted(class_set - unwanted_set)
def wcs_drizzle_product(self, meta_wcs):
"""
Create the drizzle-combined exposure image using the meta_wcs as the reference output
"""
# This insures that keywords related to the footprint are generated for this
# specific object to use in updating the output drizzle product.
self.meta_wcs = meta_wcs
if self.mask_computed is False:
self.generate_footprint_mask(save_mask=False)
# Retrieve the configuration parameters for astrodrizzle
drizzle_pars = self.configobj_pars.get_pars("astrodrizzle")
# ...and set parameters which are computed on-the-fly
drizzle_pars["final_refimage"] = meta_wcs
drizzle_pars["runfile"] = self.trl_logname
# Setting "preserve" to false so the OrIg_files directory is deleted as the purpose
# of this directory is now obsolete.
drizzle_pars["preserve"] = False
drizzle_pars["rules_file"] = self.rules_file
drizzle_pars["resetbits"] = "0"
# Set 'context' to False in order to turn off creation of CTX extensions in MVM products
# to save on disk space and memory use during processing.
drizzle_pars["context"] = False
log.debug(
"The 'final_refimage' ({}) and 'runfile' ({}) configuration variables "
"have been updated for the drizzle step of the exposure drizzle product.".format(
meta_wcs, self.trl_logname
)
)
astrodrizzle.AstroDrizzle(
input=self.full_filename, output=self.drizzle_filename, **drizzle_pars
)
# Update product with SVM-specific keywords based on the footprint
with fits.open(self.drizzle_filename, mode="update") as hdu:
for kw in self.mask_kws:
hdu[("SCI", 1)].header[kw] = tuple(self.mask_kws[kw])
# Rename Astrodrizzle log file as a trailer file
log.debug("Exposure image {}".format(self.drizzle_filename))
try:
shutil.move(self.trl_logname, self.trl_filename)
except PermissionError:
pass
def copy_exposure(self, filename):
"""
Create a copy of the original input to be renamed and used for multi-visit processing.
New exposure filename needs to follow the MVM naming convention:
hst_skycell-p<PPPP>x<XX>y<YY>_<instr>_<detector>_<filter>-<layer>_<ipppssoo>_fl[ct].fits
Parameters
----------
filename : str
Original pipeline filename for input exposure
Returns
-------
sce_filename : str
New MVM-compatible HAP filename for input exposure
"""
if filename.startswith("hst"):
sce_filename = "{}_{}".format(
self.product_basename, filename.split("_")[-1]
)
else:
suffix = filename.split("_")[1]
sce_filename = "_".join([self.product_basename, suffix])
log.info("Copying {} to MVM input: \n {}".format(filename, sce_filename))
try:
shutil.copy(filename, sce_filename)
except PermissionError:
pass
# Add HAPLEVEL keyword as required by pipeline processing
fits.setval(
sce_filename,
"HAPLEVEL",
value=1,
comment="Classification level of this product",
)
return sce_filename
[docs]
class SkyCellProduct(HAPProduct):
"""A SkyCell Product is a mosaic comprised of images acquired
during a multiple visits with one instrument, one detector, a single filter,
and all exposure times. The exposure time characteristic may change - TBD.
The "scp" is short hand for SkyCellProduct.
"""
def __init__(
self,
prop_id,
obset_id,
instrument,
detector,
aperture_from_poller,
skycell_name,
layer,
filetype,
log_level,
):
super().__init__(
prop_id, obset_id, instrument, detector, aperture_from_poller, skycell_name, filetype, log_level
)
# May need to exclude 'filter' component from layer_str
filter_str = layer[0]
self.filters = filter_str
# parse layer information into filename layer_str
# layer: [filter_str, pscale_str, exptime_str, epoch_str]
# e.g.: [f160w, coarse, all, all]
#
if layer[1] == "coarse":
layer_str = "-".join([layer[1], layer[2]])
else:
layer_str = "all"
layer_scale = layer[1]
self.info = "_".join(
["hst", skycell_name, instrument, detector, filter_str, layer_str]
)
self.exposure_name = skycell_name
self.cell_id = skycell_name.strip("skycell-")
self.product_basename = self.info
# Trailer names .txt or .log
self.trl_logname = self.product_basename + "_trl.log"
self.trl_filename = self.product_basename + "_trl.txt"
# Initialize the output catalog name attributes (for the time when they are created)
self.point_cat_filename = None
self.segment_cat_filename = None
self.drizzle_filename = (
"_".join([self.product_basename, self.filetype]) + ".fits"
)
self.refname = self.product_basename + "_ref_cat.ecsv"
# Generate the name for the manifest file which is for the entire multi-visit. It is fine
# to use only one of the SkyCellProducts to generate the manifest name as the name
# is only dependent on the sky cell.
# Example: skycell-p<PPPP>x<XX>y<YY>_manifest.txt (e.g., skycell-p0797x12y05_manifest.txt)
self.manifest_name = "_".join([skycell_name, "manifest.txt"])
# Define HAPLEVEL value for this product
self.haplevel = 4
# These attributes will be populated during processing
self.edp_list = []
self.new_to_layer = 0
self.regions_dict = {}
self.skycell = cell_utils.SkyCell.from_name(skycell_name, scale=layer_scale)
self.configobj_pars = None
self.meta_wcs = None
self.all_mvm_exposures = []
self.meta_bounded_wcs = None
log.debug(
"SkyCell object {}/{}/{} created.".format(
self.instrument, self.detector, self.filters
)
)
def find_member(self, name):
"""Return member instance with filename 'name'"""
desired_member = None
for member in [self] + self.edp_list:
if name == member.drizzle_filename:
desired_member = member
break
return desired_member
def add_member(self, edp):
"""Add an ExposureProduct object to the list - composition."""
self.edp_list.append(edp)
self.new_to_layer += edp.new_process
def verify_members(self, clean=True):
"""Verify whether input members were modified during processing.
Parameter
---------
clean : bool
Specify whether or not to remove file from disk. If False,
leave file behind, usually for debugging purposes when
``diagnostic_mode`` is True.
Returns
--------
del_files : list
List of input exposure full_filename values for all exposures
which were NOT modified during processing based on the
``input_updated`` attribute of the ``ExposureProduct``. This will be empty
if all exposures had their WCS updated, for example.
"""
del_files = [edp.verify_member(clean=clean) for edp in self.edp_list]
return del_files
def add_all_mvm_exposures_list(self, exp_list):
"""Add a list containing all the MVM FLT or FLC filenames, even the
filenames for exposures which have been previously processed.
"""
self.all_mvm_exposures = exp_list
def generate_metawcs(self, custom_limits=None):
"""Generate a meta wcs
Parameters
----------
custom_limits : list, optional.
a 4-element list containing the mosaic bounding rectangle X min and max and Y min and max values
This input argument is only used for creation of custom mosaics. These coordinates are in the
frame of reference of the projection cell, at fine (platescale = 0.04 arcsec/pixel) resolution.
"""
if (
custom_limits
): # for creation of custom multi-skycell mosaics, base meta_wcs on projection cell WCS
wcs = copy.deepcopy(self.skycell.projection_cell.wcs)
ratio = self.skycell.projection_cell.wcs.pscale / self.skycell.scale
# Store unscaled and scaled versions of the custom mosaic bounding box X and Y limits
xmin_unscaled = int(np.rint(custom_limits[0]))
xmax_unscaled = int(np.rint(custom_limits[1]))
ymin_unscaled = int(np.rint(custom_limits[2]))
ymax_unscaled = int(np.rint(custom_limits[3]))
xmin_scaled = int(np.rint(custom_limits[0] * ratio))
xmax_scaled = int(np.rint(custom_limits[1] * ratio))
ymin_scaled = int(np.rint(custom_limits[2] * ratio))
ymax_scaled = int(np.rint(custom_limits[3] * ratio))
# Adjust WCS values as needed
crpix1 = int(np.rint((wcs.wcs.crpix[0] - xmin_unscaled) * ratio))
crpix2 = int(np.rint((wcs.wcs.crpix[1] - ymin_unscaled) * ratio))
wcs.wcs.crpix = [crpix1, crpix2]
wcs.wcs.crval = wcs.wcs.crval
wcs.wcs.cd = wcs.wcs.cd / ratio
wcs.wcs.ctype = ["RA---TAN", "DEC--TAN"]
wcs.pscale = wcs.pscale / ratio
wcs._naxis[0] = int(np.rint(wcs._naxis[0] * ratio))
wcs._naxis[1] = int(np.rint(wcs._naxis[1] * ratio))
self.bounding_box = [
slice(ymin_scaled, ymax_scaled),
slice(xmin_scaled, xmax_scaled),
]
wcs.pixel_shape = [
xmax_scaled - xmin_scaled + 1,
ymax_scaled - ymin_scaled + 1,
]
else: # For regular MVM processing (single skycell), base meta_wcs on skycell wcs
wcs = copy.deepcopy(self.skycell.wcs)
# This is the exposure-independent WCS.
self.meta_wcs = wcs
# Create footprint on the sky for all input exposures using the skycell wcs
# This footprint includes all the exposures in the visit, NEW exposures, as well
# as exposures which have been previously processed (all are listed in the original
# poller file).
mvm_footprint = cell_utils.SkyFootprint(wcs)
log.debug(self.all_mvm_exposures)
mvm_footprint.build(self.all_mvm_exposures)
# This is the exposure-dependent WCS.
self.meta_bounded_wcs = copy.deepcopy(mvm_footprint.bounded_wcs)
del mvm_footprint
return self.meta_bounded_wcs
def wcs_drizzle_product(self, meta_wcs):
"""
Create the drizzle-combined sky-cell layer image using the meta_wcs as the reference output
"""
# This insures that keywords related to the footprint are generated for this
# specific object to use in updating the output drizzle product.
self.meta_wcs = meta_wcs
if self.mask_computed is False:
self.generate_footprint_mask(save_mask=False)
# Retrieve the configuration parameters for astrodrizzle
drizzle_pars = self.configobj_pars.get_pars("astrodrizzle")
# ...and set parameters which are computed on-the-fly
drizzle_pars["final_refimage"] = meta_wcs
drizzle_pars["runfile"] = self.trl_logname
drizzle_pars["rules_file"] = self.rules_file
# Setting "preserve" to false so the OrIg_files directory is deleted as the purpose
# of this directory is now obsolete.
drizzle_pars["preserve"] = False
# Turn off generation of CTX extension (not useful for these products)
drizzle_pars["context"] = False
log.debug(
"The 'final_refimage' ({}) and 'runfile' ({}) configuration variables "
"have been updated for the drizzle step of the filter drizzle product.".format(
meta_wcs, self.trl_logname
)
)
edp_filenames = [element.full_filename for element in self.edp_list]
astrodrizzle.AstroDrizzle(
input=edp_filenames, output=self.drizzle_filename, **drizzle_pars
)
# Update product with MVM-specific keywords based on the footprint
with fits.open(self.drizzle_filename, mode="update") as hdu:
for kw in self.mask_kws:
hdu[0].header[kw] = tuple(self.mask_kws[kw])
# Add SCELLID keyword to MVM product
hdu[0].header["SCELLID"] = self.cell_id
# Now that the drizzle product is created, compute the mean and median
# of the WHT image
wht = hdu["WHT", 1].data
wht_mask = self.mask > 0 if hasattr(self.mask, "shape") else wht > 0.0
self.mask_whtkws["MEANWHT"][0] = np.mean(wht[wht_mask])
self.mask_whtkws["MEDWHT"][0] = np.median(wht[wht_mask])
for kw in self.mask_whtkws:
hdu[0].header[kw] = tuple(self.mask_whtkws[kw])
# Rename Astrodrizzle log file as a trailer file
log.debug(
"Sky-cell layer image {} composed of: {}".format(
self.drizzle_filename, edp_filenames
)
)
try:
shutil.move(self.trl_logname, self.trl_filename)
except PermissionError:
pass