#!/usr/bin/env python
__version__ = '2.0.4'
__revision__ = '20260210'
import sys
import shutil
import os
import glob
import string
import filecmp
import numpy as np
from astropy.io import ascii, fits
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.utils import xml
from datetime import datetime, timedelta
import time
import json
[docs]
class musereduce:
'''
Kwargs:
configfile : :obj:`str`
A :obj:`json` configfile for musereduce, where all the parameters
are set.
debug : :obj:`bool`, (optional), default: :obj:`None`
:obj:`True`: :class:`MUSEreduce.musereduce` runs in debug mode and
ESORex will not be executed. All products must be available.
It can be used for testing the creation of folder, and creating
the ``.sof`` files
'''
def __init__(self, configfile, debug=False):
# Reading in the configuration file
if configfile == None:
configfile = os.path.join(os.path.dirname(__file__),"config.json")
with open(configfile, "r") as read_file:
self.config = json.load(read_file)
# setting all the parameters from teh configuration file
self.OB_list = np.array(self.config['global']['OB_list'])
self.dithering_multiple_OBs = self.config['global']['dither_multiple_OBs']
self.rootpath = self.config['global']['rootpath']
self.mode = self.config['global']['mode']
self.auto_sort_data = self.config['global']['auto_sort_data']
self.auto_create_OB_folders = self.config['global']['auto_create_OB_folders']
self.using_specific_exposure_time = self.config['global']['using_specific_exposure_time']
self.n_CPU = self.config['global']['n_CPU']
self.using_ESO_calibration = self.config['global']['using_ESO_calibration']
self.dark = self.config['calibration']['dark']
self.renew_statics = self.config['global']['renew_statics']
self.skyreject = self.config['sci_basic']['skyreject']
if not 'skylines' in self.config['sci_basic']:
self.skylines = '5577.339,6300.304'
else:
self.skylines = self.config['sci_basic']['skylines']
if not 'execute_std' in self.config['sci_basic']:
self.reduce_std = True
else:
self.reduce_std = self.config['sci_basic']['execute_std']
if not 'execute_sci' in self.config['sci_basic']:
self.reduce_sci = True
else:
self.reduce_sci = self.config['sci_basic']['execute_sci']
self.skyfield = self.config['sky']['sky_field']
self.skyfraction = self.config['sky']['fraction']
self.skyignore = self.config['sky']['ignore']
self.skymethod = self.config['sky']['method']
self.skysub = self.config['sci_post']['subtract_sky']
self.autocalib = self.config['sci_post']['autocalib']
self.raman = self.config['sci_post']['raman']
if self.mode != 'NFM-AO':
self.raman = False
self.weight = self.config['exp_combine']['weight']
self.user_list = np.array(self.config['global']['exp_list'], dtype=object)
self.raw_data_dir = os.path.join(self.rootpath, 'raw/')
self.reduced_dir = os.path.join(self.rootpath, 'reduced/')
self.working_dir = self.reduced_dir
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
self.combining_OBs_dir = None
self.calibration_dir = None
self.ESO_calibration_dir = None
self.static_calibration_dir = None
self.dithername = None
self.debug = debug
[docs]
def execute(self):
'''
This method executes wrapper and starts the data reduction process
set in the :obj:`json` config file.
'''
startime = time.time()
print('')
print(' ')
print('##############################################################')
print('##### #####')
print('##### MUSE data reduction pipeline wrapper #####')
print('##### Must be used with ESORex and ESO MUSE pipeline #####')
print('##### author: Peter Zeidler (zeidler@stsci.edu) #####')
print('##### Feb 11, 2026 #####')
print('##### Version: '+str(__version__)+' \
#####')
print('##### #####')
print('##############################################################')
print('')
print('')
print('... Checking various necessary variables')
assert sys.version_info > (3, 0), 'YOU ARE NOT USING PYTHON 3.x'
assert self.config['global']['pipeline_path'], 'NO PIPELINE PATH DEFINED'
assert self.config['global']['rootpath'], 'NO ROOTPATH DEFINED'
if self.config['sky']['modified'] == True:
self.skymethod = 'subtract-model'
print("")
print("WARNING: modified sky is used")
print("... setting the skymethod to: ",self.skymethod)
print("")
try:
self.modified_continuum = self.config['sky']['modified_continuum']
print("... setting the modified continuum to: ", self.modified_continuum)
except KeyError:
print("")
print("WARNING: modified_continuum not provided in config file !!!")
print("... setting the modified continuum to default [True]")
print("")
self.modified_continuum = True
self.static_calib_path = self.config.get('global', {}).get('static_calib_path', None)
if not self.static_calib_path:
self.static_calib_path = os.path.join(self.config['global']['pipeline_path'], 'calib/muse*/')
print('No specific static calibration path defined')
print(' >>> Static Calibration path: ' + self.static_calib_path)
print('')
print('... Perfect, everything checks out')
print('')
if self.config['cleanup']['execute']:
print('... Will cleanup folders first before starting other modules')
print('')
print('... Settings for data reduction')
if self.debug:
print('##################################################')
print('##### RUNNING IN DEBUG MODE #####')
print('##### PRODUCTS MUST BE ALL THERE #####')
print('##### NO ESORex EXECUTION #####')
print('##################################################')
print("")
print(' >>> Number of cores: ' + str(self.n_CPU) + ' cores')
print(' >>> Observation mode: ' + self.mode)
if self.config['calibration']['execute'] == True:
if self.using_ESO_calibration:
print(' >>> Using ESO calibration files')
if self.config['calibration']['esorex_kwargs_bias'] or self.config['calibration']['esorex_kwargs_dark'] or\
self.config['calibration']['esorex_kwargs_flat'] or self.config['calibration']['esorex_kwargs_wavecal'] or\
self.config['calibration']['esorex_kwargs_lsf'] or self.config['calibration']['esorex_kwargs_twilight']:
print('WARNING: KWARGS ARE SET BUT ESO CALIBRATIONS ARE USED')
else:
print(' >>> Using self-processed calibration files')
if self.dark:
print(' >>> DARK will be reduced and used')
if not self.dark:
print(' >>> DARK will not be reduced and used')
if self.dark:
print(' >>> DARK will be used: ' + str(self.dark))
if self.raman:
print(' >>> Removal of Raman lines: ' + str(self.raman))
if self.dithering_multiple_OBs:
print(' >>> Exposures per pointing are spread over multiple OBs')
self.multOB_exp_counter = 0
self.multOB_unique_pointings_ID = None
else:
print(' >>> All exposures per pointing are located in one OB')
if len(self.user_list) > 1:
print(' >>> Dithering exposures input list:')
for li in self.user_list:
print(li)
if len(self.user_list) == 1:
if self.user_list[0] == 'all':
print(' >>> Dithering all exposures within OB folder:')
print('')
print('... The following modules will be executed')
if self.config['calibration']['execute'] and not self.using_ESO_calibration:
print(' >>> BIAS')
if self.dark:
print(' >>> DARK')
print(' >>> FLAT')
print(' >>> WAVECAL')
print(' >>> LSF')
print(' >>> TWILIGHT')
if self.config['sci_basic']['execute']:
print(' >>> SCIBASIC')
if not self.reduce_sci:
print(' Caution SCIBASIC will not be executed on SCI exposure!!')
if not self.reduce_std:
print(' Caution SCIBASIC will not be executed on STD star!!')
if self.config['std_flux']['execute']:
print(' >>> STANDARD')
if self.config['sky']['execute']:
print(' >>> CREATE_SKY')
if self.config['sky']['modified']:
print(' ... modified execution')
if self.config['sci_post']['execute']:
print(' >>> SCI_POST')
if self.config['exp_align']['execute']:
print(' >>> EXP_ALIGN')
if self.config['exp_combine']['execute']:
print(' >>> EXP_COMBINE')
print(' ')
print('')
print('##############################################################')
print('##### All parameters set: Starting the data reduction #####')
print('##############################################################')
print('')
print(' >>> Creating OB list')
_create_ob_folders(self)
if self.dithering_multiple_OBs:
self.dithername = self.OB_list[0]
print('==> The pointing name is: ' + self.dithername)
print(' ')
print('... Creating directories')
for OB in self.OB_list:
if self.dithering_multiple_OBs:
self.working_dir = os.path.join(self.rootpath, 'reduced', OB)
self.combining_OBs_dir = os.path.join(self.rootpath, 'reduced', self.dithername)
if not os.path.exists(self.combining_OBs_dir):
os.mkdir(self.combining_OBs_dir)
else:
self.working_dir = os.path.join(self.rootpath, 'reduced', OB)
self.combining_OBs_dir = None
self.calibration_dir = os.path.join(self.working_dir, 'calibrations')
self.ESO_calibration_dir = os.path.join(self.working_dir, 'ESO_calibrations')
self.static_calibration_dir = os.path.join(self.working_dir, 'static_calibration_files')
if self.renew_statics and os.path.exists(self.static_calibration_dir):
shutil.rmtree(self.static_calibration_dir)
if self.renew_statics and self.auto_sort_data and os.path.exists(self.ESO_calibration_dir):
shutil.rmtree(self.ESO_calibration_dir)
if not os.path.exists(self.working_dir):
os.mkdir(self.working_dir)
if not os.path.exists(os.path.join(self.working_dir, 'std')):
os.mkdir(os.path.join(self.working_dir, 'std'))
if not os.path.exists(self.calibration_dir):
os.mkdir(self.calibration_dir)
if not os.path.exists(self.ESO_calibration_dir):
os.mkdir(self.ESO_calibration_dir)
if not os.path.exists(os.path.join(self.calibration_dir, 'DARK')) and self.dark:
os.mkdir(os.path.join(self.calibration_dir, 'DARK'))
if not os.path.exists(os.path.join(self.calibration_dir, 'TWILIGHT')):
os.mkdir(os.path.join(self.calibration_dir, 'TWILIGHT'))
if not os.path.exists(os.path.join(self.calibration_dir, 'SCIENCE')):
os.mkdir(os.path.join(self.calibration_dir, 'SCIENCE'))
if os.path.exists(self.static_calibration_dir):
shutil.rmtree(self.static_calibration_dir)
os.mkdir(self.static_calibration_dir)
for itername in glob.glob(os.path.join(self.static_calib_path, '*.*')):
shutil.copy(itername, os.path.join(self.static_calibration_dir, '.'))
print('')
print('... Sorting the data')
if self.auto_sort_data:
print(' >>> Sorting the raw data')
_sort_data(self)
else:
print(' >>> raw data will not be sorted')
print(' >>> MANUAL INTERACTION MAY BE NEEDED')
for OB in self.OB_list:
self.working_dir = os.path.join(self.reduced_dir, OB)
if not self.using_specific_exposure_time:
exp_list_SCI =\
np.concatenate([glob.glob(os.path.join(self.working_dir, '*_SCI.list')),\
glob.glob(os.path.join(self.working_dir, '*_SKY.list'))])
exp_list_SCI = sorted(exp_list_SCI)
exp_list_DAR =\
sorted(glob.glob(os.path.join(self.working_dir, '*_DAR.list')))
exp_list_TWI =\
sorted(glob.glob(os.path.join(self.working_dir, '*_TWI.list')))
if self.using_specific_exposure_time:
exp_list_SCI =\
sorted(np.concatenate([glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_SCI.list')),
glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_SKY.list'))]))
exp_list_DAR =\
sorted(glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_DAR.list')))
exp_list_TWI =\
sorted(glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_TWI.list')))
for exposure in exp_list_SCI:
exposure_dir = os.path.join(exposure[:-9])
if not os.path.exists(exposure_dir):
os.mkdir(exposure_dir)
if self.config['cleanup']['execute']:
print(' ')
print('... cleaning')
_cleanup(self, exp_list_SCI, include_std=self.config['cleanup']['include_std'], include_combined=self.config['cleanup']['include_combined'],
include_calibrations=self.config['cleanup']['include_calibrations'])
print(' ')
print('... reducing OB: ' + OB)
print(' ')
### CALIBRATION PRE-PROCESSING ###
if self.config['calibration']['execute']:
create_sof = self.config['calibration']['create_sof']
if not self.using_ESO_calibration:
_bias(self, exp_list_SCI, exp_list_DAR,\
exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_bias'])
if self.dark:
_dark(self, exp_list_SCI, exp_list_DAR, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_dark'])
_flat(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_flat'])
_wavecal(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_wavecal'])
_lsf(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_lsf'])
_twilight(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_twilight'])
### OBSERVATION PRE-PROCESSING ###
if self.config['sci_basic']['execute']:
create_sof = self.config['sci_basic']['create_sof']
_science_pre(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sci_basic']['esorex_kwargs'])
### OBSERVATION POST-PROCESSING ###
if self.config['std_flux']['execute']:
create_sof = self.config['std_flux']['create_sof']
_std_flux(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['std_flux']['esorex_kwargs'])
if self.config['sky']['execute']:
create_sof = self.config['sky']['create_sof']
if not self.config['sky']['modified']:
_sky(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sky']['esorex_kwargs'])
if self.config['sky']['modified']:
_modified_sky(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sky']['esorex_kwargs'])
###s SCIENCE POST-PROCESSING ###
if self.config['sci_post']['execute']:
create_sof = self.config['sci_post']['create_sof']
_scipost(self, exp_list_SCI, create_sof, OB, esorex_kwargs=self.config['sci_post']['esorex_kwargs'])
if self.config['dither_collect']['execute']:
_dither_collect(self, exp_list_SCI, OB)
if self.dithering_multiple_OBs:
if self.multOB_exp_counter == 0:
if self.config['exp_align']['execute']:
create_sof = self.config['exp_align']['create_sof']
_exp_align(self, exp_list_SCI, create_sof, OB, esorex_kwargs=self.config['exp_align']['esorex_kwargs'])
if self.config['exp_combine']['execute']:
create_sof = self.config['exp_combine']['create_sof']
_exp_combine(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['exp_combine']['esorex_kwargs'])
else:
if self.config['exp_align']['execute']:
create_sof = self.config['exp_align']['create_sof']
_exp_align(self, exp_list_SCI, create_sof, OB,
esorex_kwargs=self.config['exp_align']['esorex_kwargs'])
if self.config['exp_combine']['execute']:
create_sof = self.config['exp_combine']['create_sof']
_exp_combine(self, exp_list_SCI, create_sof,
esorex_kwargs=self.config['exp_combine']['esorex_kwargs'])
if self.dithering_multiple_OBs:
self.multOB_exp_counter += 1
endtime = time.time()
print(' >>> Total execution time: ',\
timedelta(seconds=endtime - startime))
[docs]
def _get_filelist(self, data_dir, filename_wildcard):
'''
This module collects the necessary file lists from folders.
Args:
data_dir : :obj:`str`
The directory where the files are located.
filename_wildcard : :obj:`str`
The filenames which should be collected
'''
os.chdir(data_dir)
raw_data_list = glob.glob(filename_wildcard)
os.chdir(self.rootpath)
return raw_data_list
[docs]
def _call_esorex(self, exec_dir, esorex_cmd, sof, esorex_kwargs=None):
'''
This module calls the various ESOrex commands and gives it to the
terminal for execution.
Args:
exec_dir : :obj:`str`
The directory where ESOrex should be executed.
esorex_cmd : :obj:`str`
The ESOrex command that needs to be executed
sof : :obj:`str`
The name of the sof file needed for executing ESOrex
Kwargs:
esorex_kwargs : :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
os.chdir(exec_dir)
os.system('export OMP_NUM_THREADS=' + str(self.n_CPU))
if esorex_kwargs:
print('esorex ' + esorex_cmd + ' ' + esorex_kwargs + ' ' + sof)
os.system('esorex ' + esorex_cmd + ' ' + esorex_kwargs + ' ' + sof)
else:
print('esorex ' + esorex_cmd + ' ' + sof)
os.system('esorex ' + esorex_cmd + ' ' + sof)
os.chdir(self.rootpath)
[docs]
def _create_ob_folders(self):
'''
This module creates the neccessary OB folders, from the raw data
'''
if self.config['global']['OB_list']:
print(' >>> Input OB list is used')
self.OB_list = self.config['global']['OB_list']
else:
print(' >>> The OB list is automatically created from the input raw folder')
file_list = _get_filelist(self, self.raw_data_dir, '*.fits*')
obs_name = np.array([])
for files in file_list:
hdu = fits.open(os.path.join(self.raw_data_dir, files))
dprcatg_exist = hdu[0].header.get('HIERARCH ESO DPR CATG', False)
if dprcatg_exist:
dprcatg = (hdu[0].header['HIERARCH ESO DPR CATG'])
if dprcatg == 'SCIENCE':
obs_name = np.append(obs_name, hdu[0].header['HIERARCH ESO OBS NAME'])
self.OB_list = np.sort(np.unique(obs_name))
print(" >>> The OB list:")
for OBs in self.OB_list:
print(' ', OBs)
if self.dithering_multiple_OBs and len(self.user_list) > 0 and self.user_list != "all":
print('Currently a user list cannot be provided with multiple OBs !!!')
sys.exit()
if self.config['global']['auto_create_OB_folders']:
print(' >>> The OB folders are automatically created from the input OB list.')
for unique_ob in self.config['global']['OB_list']:
if os.path.exists(os.path.join(self.working_dir, unique_ob)):
print(" >>> OB folder ", unique_ob, " already exists. Nothing to do here!")
# shutil.rmtree(os.path.join(self.working_dir, unique_ob))
else:
print("Creating OB folder: ", unique_ob)
os.mkdir(os.path.join(self.working_dir, unique_ob))
[docs]
def _sort_data(self):
'''
This module reads the headers of all of the raw data and sorts it
accordingly. It also assigns the correct calibration files to the exposures
'''
file_list = _get_filelist(self, self.raw_data_dir, '*.fits*')
science_files = np.array([])
calibration_files = np.array([])
science_type = np.array([])
calibration_type = np.array([])
ESO_calibration_files = np.array([])
ESO_calibration_type = np.array([])
cal_categories = ['MASTER_BIAS', 'MASTER_DARK', 'MASTER_FLAT',\
'TRACE_TABLE', 'WAVECAL_TABLE', 'LSF_PROFILE', 'TWILIGHT_CUBE',\
'FILTER_LIST', 'EXTINCT_TABLE', 'STD_FLUX_TABLE', 'SKY_LINES',\
'GEOMETRY_TABLE', 'ASTROMETRY_WCS', 'STD_RESPONSE', 'STD_TELLURIC',\
'ASTROMETRY_REFERENCE']
for files in file_list:
hdu = fits.open(os.path.join(self.raw_data_dir, files))
dprcatg_exist = hdu[0].header.get('HIERARCH ESO DPR CATG', False)
procatg_exist = hdu[0].header.get('HIERARCH ESO PRO CATG', False)
if dprcatg_exist:
dprcatg = (hdu[0].header['HIERARCH ESO DPR CATG'])
dprtype = (hdu[0].header['HIERARCH ESO DPR TYPE'])
if dprcatg == 'SCIENCE':
science_files = np.append(science_files, files)
science_type = np.append(science_type, dprtype)
if dprcatg == 'CALIB':
calibration_files = np.append(calibration_files, files)
if dprtype == 'FLAT,LAMP':
calibration_type = np.append(calibration_type, 'FLAT')
elif dprtype == 'FLAT,SKY':
calibration_type = np.append(calibration_type, 'SKYFLAT')
elif dprtype == 'WAVE':
calibration_type = np.append(calibration_type, 'ARC')
elif dprtype == 'WAVE,MASK':
calibration_type = np.append(calibration_type, 'MASK')
elif dprtype == 'FLAT,LAMP,ILLUM':
calibration_type = np.append(calibration_type, 'ILLUM')
else:
calibration_type = np.append(calibration_type, dprtype)
if procatg_exist:
procatg = (hdu[0].header['HIERARCH ESO PRO CATG'])
ESO_calibration_files = np.append(ESO_calibration_files, files)
ESO_calibration_type = np.append(ESO_calibration_type, procatg)
rot_angles = np.zeros(len(science_files))
points = np.zeros(len(science_files), dtype=object)
rot_angles_ident = np.zeros(len(science_files))
OB_ids = np.zeros(len(science_files), dtype=object)
for sci_file_idx in range(len(science_files)):
hdu = fits.open(os.path.join(self.raw_data_dir, science_files[sci_file_idx]))[0]
OB_ids[sci_file_idx] = hdu.header['HIERARCH ESO OBS NAME']
RA = hdu.header['RA']
DEC = hdu.header['DEC']
EXPTIME = hdu.header['EXPTIME']
points[sci_file_idx] = SkyCoord(ra=RA * u.degree, dec=DEC * u.degree,\
frame='fk5').to_string('hmsdms', sep='',\
precision=0).translate(str.maketrans('', '', string.whitespace))\
+ '_' + str(int(EXPTIME)).rjust(4, '0')
rot_angles[sci_file_idx] = hdu.header['HIERARCH ESO INS DROT POSANG']
for idx in range(len(rot_angles)):
ident = 0
for idx2 in range(len(rot_angles)):
if rot_angles[idx] == rot_angles[idx2]\
and points[idx] == points[idx2]:
rot_angles_ident[idx2] = ident
ident += 1
xmlraw_superstring_dict = {}
xmlraw2raw_filelist = np.sort(glob.glob(os.path.join(self.raw_data_dir,'*_raw2raw.xml')))
xmlraw2master_filelist = np.sort(glob.glob(os.path.join(self.raw_data_dir,'*_raw2master.xml')))
for xmlraw2raw_file, xmlraw2master_file in zip(xmlraw2raw_filelist, xmlraw2master_filelist):
xmlraw2raw = xml.iterparser.xml_readlines(xmlraw2raw_file)
xmlraw2master = xml.iterparser.xml_readlines(xmlraw2master_file)
xmlraw_superstring_dict[xmlraw2raw_file.split('/')[-1]] = '\t'.join(xmlraw2raw) + '\t'.join(xmlraw2master)
for sci_file_idx in range(len(science_files)):
hdu = fits.open(os.path.join(self.raw_data_dir, science_files[sci_file_idx]))[0]
RA = hdu.header['RA']
DEC = hdu.header['DEC']
EXPTIME = hdu.header['EXPTIME']
ROT = hdu.header['HIERARCH ESO INS DROT POSANG']
DATE = hdu.header['MJD-OBS']
# xmlraw2raw = xml.iterparser.xml_readlines(os.path.join(self.raw_data_dir,science_files[sci_file_idx][:-8] + '_raw2raw.xml'))
# xmlraw2master = xml.iterparser.xml_readlines(os.path.join(self.raw_data_dir,science_files[sci_file_idx][:-8] + '_raw2master.xml'))
for xml_key, xml_item in xmlraw_superstring_dict.items():
if science_files[sci_file_idx][:-8] in xml_item:
xmlraw_superstring = xml_item
# xmlraw2raw_string = '\t'.join(xmlraw2raw)
# xmlraw2master_string = '\t'.join(xmlraw2master)
#
# xmlraw_superstring = xmlraw2raw_string + xmlraw2master_string
working_dir_temp = os.path.join(self.reduced_dir, OB_ids[sci_file_idx])
c = SkyCoord(ra=RA * u.degree, dec=DEC * u.degree,\
frame='fk5').to_string('hmsdms', sep='',\
precision=0).translate(str.maketrans('', '', string.whitespace))
filelist_science = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\
+ str(int(ROT)).rjust(3, '0') + '_'\
+ str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0')\
+ '_SCI.list'
filelist_sky = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\
+ str(int(ROT)).rjust(3, '0') + '_'\
+ str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_SKY.list'
filelist_dark = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\
+ str(int(ROT)).rjust(3, '0') + '_'\
+ str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_DAR.list'
filelist_twilight = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\
+ str(int(ROT)).rjust(3, '0') + '_'\
+ str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_TWI.list'
dark_date = np.array([])
twilight_date = np.array([])
for calibfiles in range(len(calibration_files)):
if calibration_type[calibfiles] == 'DARK':
hdu_temp = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0]
temp_date = hdu_temp.header['MJD-OBS']
dark_date = np.append(dark_date, temp_date)
if calibration_type[calibfiles] == 'SKYFLAT':
hdu_temp = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0]
temp_date = hdu_temp.header['MJD-OBS']
twilight_date = np.append(twilight_date, temp_date)
dark_date = np.unique(dark_date)
twilight_date = np.unique(twilight_date)
if science_type[sci_file_idx] == 'OBJECT':
f_science = open(os.path.join(working_dir_temp, filelist_science), 'w')
if science_type[sci_file_idx] == 'SKY':
f_science = open(os.path.join(working_dir_temp, filelist_sky), 'w')
f_dark = open(os.path.join(working_dir_temp, filelist_dark), 'w')
f_twilight = open(os.path.join(working_dir_temp, filelist_twilight), 'w')
f_science.write(os.path.join(self.raw_data_dir, science_files[sci_file_idx])\
+ ' ' + science_type[sci_file_idx] + '\n')
### Sorting the ESO calibration files
ESO_calibration_dir_temp = os.path.join(working_dir_temp, "ESO_calibrations")
if len(ESO_calibration_files) > 0:
for ifile, ESO_calibration_file in enumerate(ESO_calibration_files):
ESO_calibration_filename = ESO_calibration_file.split("/")[-1][:-8]
if ESO_calibration_filename in xmlraw_superstring and not os.path.isfile(os.path.join(ESO_calibration_dir_temp, ESO_calibration_type[ifile] + '.fits')):
shutil.copy(os.path.join(self.raw_data_dir, ESO_calibration_file), os.path.join(ESO_calibration_dir_temp, ESO_calibration_type[ifile] + '.fits'))
for calibfiles in range(len(calibration_files)):
calibfilename = calibration_files[calibfiles].split("/")[-1][:-8]
if calibfilename in xmlraw_superstring:
temp_date = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0].header['MJD-OBS']
if abs(temp_date - DATE) <= 1.:
f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles]) + ' ' +\
calibration_type[calibfiles] + '\n')
if calibration_type[calibfiles] == 'STD'\
and abs(temp_date - DATE) > 1.:
f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\
+ ' ' + calibration_type[calibfiles] + '\n')
if calibration_type[calibfiles] == 'ILLUM' and abs(temp_date\
- DATE) > 1.:
f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles]) + ' '\
+ calibration_type[calibfiles] + '\n')
print('WARNING: STD and SCI obs more than 24 hours apart')
if (abs(temp_date - dark_date) <= 1.).all():
f_dark.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\
+ ' ' + calibration_type[calibfiles] + '\n')
if (abs(temp_date - twilight_date) <= 1.).all():
f_twilight.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\
+ ' ' + calibration_type[calibfiles] + '\n')
f_science.close()
f_dark.close()
f_twilight.close()
[docs]
def _bias(self, exp_list_SCI, exp_list_DAR, exp_list_TWI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_bias``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_DAR : :obj:`list`:
The list of all associated dark files including their category read
from the fits header
exp_list_TWI : :obj:`list`:
The list of all associated twilight files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the MASTER BIAS')
esorex_cmd = '--log-file=bias.log --log-level=debug'\
+ ' muse_bias'\
+ ' --nifu=-1'\
+ ' --merge'
sof = 'bias.sof'
if create_sof:
sci_bias_sof = os.path.join(self.calibration_dir, 'SCIENCE/bias.sof')
dar_bias_sof = os.path.join(self.calibration_dir, 'DARK/bias.sof')
twi_bias_sof = os.path.join(self.calibration_dir, 'TWILIGHT/bias.sof')
sci_bias_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE/bias_temp.sof')
dar_bias_sof_temp = os.path.join(self.calibration_dir, 'DARK/bias_temp.sof')
twi_bias_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT/bias_temp.sof')
if os.path.exists(sci_bias_sof):
os.remove(sci_bias_sof)
if self.dark and os.path.exists(dar_bias_sof):
os.remove( dar_bias_sof)
if os.path.exists(twi_bias_sof):
os.remove(twi_bias_sof)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1) + '/'\
+ str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
if self.dark:
raw_data_list_DARK = ascii.read(exp_list_DAR[exposure_ID],\
format='no_header')
raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\
format='no_header')
f_science = open(sci_bias_sof_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'BIAS':
f_science.write(raw_data_list[i][0] + ' '\
+ raw_data_list[i][1] + '\n')
f_science.close()
if self.dark:
f_dark = open(dar_bias_sof_temp, 'w')
for i in range(len(raw_data_list_DARK)):
if raw_data_list_DARK[i][1] == 'BIAS':
f_dark.write(raw_data_list_DARK[i][0] + ' '\
+ raw_data_list_DARK[i][1] + '\n')
f_dark.close()
f_twilight = open(twi_bias_sof_temp, 'w')
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'BIAS':
f_twilight.write(raw_data_list_TWILIGHT[i][0] + ' '\
+ raw_data_list_TWILIGHT[i][1] + '\n')
f_twilight.close()
if os.path.isfile(sci_bias_sof):
assert filecmp.cmp(sci_bias_sof,sci_bias_sof_temp),\
'CAUTION CALIBRATION FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(sci_bias_sof_temp)
else:
os.rename(sci_bias_sof_temp, sci_bias_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if os.path.isfile(twi_bias_sof):
assert filecmp.cmp(twi_bias_sof, twi_bias_sof_temp),\
'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(twi_bias_sof_temp)
else:
os.rename(twi_bias_sof_temp, twi_bias_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if self.dark:
if os.path.isfile(dar_bias_sof):
assert filecmp.cmp(dar_bias_sof_temp),\
'CAUTION DARK FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(dar_bias_sof_temp)
else:
os.rename(dar_bias_sof_temp, dar_bias_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'DARK'),
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if self.dark:
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _dark(self, exp_list_SCI, exp_list_DAR, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_dark``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_DAR : :obj:`list`:
The list of all associated dark files including their category read
from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the MASTER DARK')
esorex_cmd = '--log-file=dark.log --log-level=debug'\
+ ' muse_dark'\
+ ' --nifu=-1'\
+ ' --merge'
sof = ' dark.sof'
if create_sof:
dar_dark_sof = os.path.join(self.calibration_dir, 'DARK/dark.sof')
dar_dark_sof_temp = os.path.join(self.calibration_dir, 'DARK/dark_temp.sof')
if os.path.exists(dar_dark_sof):
os.remove(dar_dark_sof)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1) + '/'\
+ str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_DAR[exposure_ID],\
format='no_header')
f = open(dar_dark_sof_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'DARK':
f.write(self.raw_data_list[i][0] + ' '\
+ raw_data_list[i][1] + '\n')
f.write(self.calibration_dir + 'DARK/MASTER_BIAS.fits \
MASTER_BIAS\n')
f.close()
if os.path.isfile(dar_dark_sof):
assert filecmp.cmp(dar_dark_sof, dar_dark_sof_temp),\
'CAUTION DARK FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(dar_dark_sof_temp)
else:
os.rename(dar_dark_sof_temp, dar_dark_sof)
if not self.debug:
_call_esorex(os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _flat(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_flat``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_TWI : :obj:`list`:
The list of all associated twilight files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the MASTER FLAT')
esorex_cmd = '--log-file=flat.log --log-level=debug'\
+ ' muse_flat'\
+ ' --samples=true'\
+ ' --nifu=-1'\
+ ' --merge'
sof = ' flat.sof'
if create_sof:
sci_flat_sof = os.path.join(self.calibration_dir, 'SCIENCE','flat.sof')
sci_flat_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE','flat_temp.sof')
twi_flat_sof = os.path.join(self.calibration_dir, 'TWILIGHT','flat.sof')
twi_flat_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT','flat_temp.sof')
if os.path.exists(sci_flat_sof):
os.remove(sci_flat_sof)
if os.path.exists(twi_flat_sof):
os.remove(twi_flat_sof)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1)\
+ '/' + str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\
format='no_header')
f = open(sci_flat_sof_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'FLAT':
f.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
f.write(os.path.join(self.calibration_dir,'SCIENCE','MASTER_BIAS.fits') + ' MASTER_BIAS\n')
if self.dark:
f.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n')
f.close()
if os.path.isfile(sci_flat_sof):
assert filecmp.cmp(sci_flat_sof,sci_flat_sof_temp),\
'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(sci_flat_sof_temp)
else:
os.rename(sci_flat_sof_temp,sci_flat_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
f = open(twi_flat_sof_temp, 'w')
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'FLAT':
f.write(raw_data_list_TWILIGHT[i][0]\
+ ' ' + raw_data_list_TWILIGHT[i][1] + '\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT','MASTER_BIAS.fits') + ' MASTER_BIAS\n')
if self.dark:
f.write(os.path.join(self.calibration_dir,'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n')
f.close()
if os.path.isfile(twi_flat_sof):
assert filecmp.cmp(twi_flat_sof,twi_flat_sof_temp),\
'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(twi_flat_sof_temp)
else:
os.rename(twi_flat_sof_temp,twi_flat_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir,'TWILIGHT'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _wavecal(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_wavecal``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_TWI : :obj:`list`:
The list of all associated twilight files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the WAVELENGTH CALIBRATION')
esorex_cmd = '--log-file=wavecal.log --log-level=debug'\
+ ' muse_wavecal'\
+ ' --nifu=-1'\
+ ' --residuals'\
+ ' --merge'
sof = ' wavecal.sof'
if create_sof:
sci_wav_sof = os.path.join(self.calibration_dir, 'SCIENCE', 'wavecal.sof')
sci_wav_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE', 'wavecal_temp.sof')
twi_wav_sof = os.path.join(self.calibration_dir, 'TWILIGHT', 'wavecal.sof')
twi_wav_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'wavecal_temp.sof')
if os.path.exists(sci_wav_sof):
os.remove(sci_wav_sof)
if os.path.exists(twi_wav_sof):
os.remove(twi_wav_sof)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1)\
+ '/' + str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\
format='no_header')
f = open(sci_wav_sof_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'ARC':
f.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
if self.dark:
f.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n')
f.close()
if os.path.isfile(sci_wav_sof_temp):
assert filecmp.cmp(os.path.join(self.calibration_dir, 'SCIENCE','wavecal.sof'),
os.path.join(self.calibration_dir, 'SCIENCE','wavecal_temp.sof'),
'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK')
os.remove(sci_wav_sof_temp)
else:
os.rename(sci_wav_sof_temp, sci_wav_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
f = open(twi_wav_sof_temp, 'w')
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'ARC':
f.write(raw_data_list_TWILIGHT[i][0]\
+ ' ' + raw_data_list_TWILIGHT[i][1] + '\n')
f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits')+ ' MASTER_BIAS\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits')+ ' TRACE_TABLE\n')
if self.dark:
f.write(os.path.join(self.calibration_dir, 'DARK', 'MASTER_DARK.fits')+ ' MASTER_DARK\n')
f.close()
if os.path.isfile(twi_wav_sof):
assert filecmp.cmp(twi_wav_sof, twi_wav_sof_temp),\
'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(twi_wav_sof_temp)
else:
os.rename(twi_wav_sof_temp,twi_wav_sof)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _lsf(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_lsf``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_TWI : :obj:`list`:
The list of all associated twilight files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the LINE SPREAD FUNCTION')
esorex_cmd = '--log-file=lsf.log --log-level=debug'\
+ ' muse_lsf'\
+ ' --nifu=-1'\
+ ' --merge'\
+ ' --save_subtracted'
sof = ' lsf.sof'
if create_sof:
sci_sof_file = os.path.join(self.calibration_dir, 'SCIENCE', 'lsf.sof')
twi_sof_file = os.path.join(self.calibration_dir, 'TWILIGHT', 'lsf.sof')
sci_sof_file_temp = os.path.join(self.calibration_dir, 'SCIENCE', 'lsf_temp.sof')
twi_sof_file_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'lsf_temp.sof')
if os.path.exists(sci_sof_file):
os.remove(sci_sof_file)
if os.path.exists(twi_sof_file):
os.remove(twi_sof_file)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1)\
+ '/' + str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\
format='no_header')
f = open(sci_sof_file_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'ARC':
f.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n')
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f.close()
if os.path.isfile(sci_sof_file):
assert filecmp.cmp(sci_sof_file, sci_sof_file_temp), 'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(sci_sof_file_temp)
else:
os.rename(sci_sof_file_temp, sci_sof_file)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
f = open(twi_sof_file_temp, 'w')
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'ARC':
f.write(raw_data_list_TWILIGHT[i][0]\
+ ' ' + raw_data_list_TWILIGHT[i][1] + '\n')
f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f.close()
if os.path.isfile(twi_sof_file):
assert filecmp.cmp(twi_sof_file, twi_sof_file_temp),\
'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(twi_sof_file_temp)
else:
os.rename(twi_sof_file_temp, twi_sof_file)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _twilight(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_twilight``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
exp_list_TWI : :obj:`list`:
The list of all associated twilight files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Creating the TWILIGHT FLAT')
esorex_cmd = '--log-file=twilight.log --log-level=debug'\
+ ' muse_twilight'
sof = ' twilight.sof'
if create_sof:
sof_file = os.path.join(self.calibration_dir, 'TWILIGHT', 'twilight.sof')
sof_file_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'twilight_temp.sof')
if os.path.exists(sof_file):
os.remove(sof_file)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: ' + str(exposure_ID + 1)\
+ '/' + str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\
format='no_header')
MJDsillum = np.array([])
MJDsskyflat = np.array([])
illum_index = np.array([])
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'SKYFLAT':
skyflathdu = fits.open(raw_data_list_TWILIGHT[i][0])
MJDskyflat = skyflathdu[0].header['MJD-OBS']
MJDsskyflat = np.append(MJDsskyflat, MJDskyflat)
if raw_data_list_TWILIGHT[i][1] == 'ILLUM':
illumhdu = fits.open(raw_data_list_TWILIGHT[i][0])
MJDillum = illumhdu[0].header['MJD-OBS']
MJDsillum = np.append(MJDsillum, MJDillum)
illum_index = np.append(illum_index, i)
choosen_illum = int(illum_index[np.argmin(np.abs(MJDsillum\
- np.min(MJDskyflat)))])
f = open(sof_file_temp, 'w')
for i in range(len(raw_data_list_TWILIGHT)):
if raw_data_list_TWILIGHT[i][1] == 'SKYFLAT':
f.write(raw_data_list_TWILIGHT[i][0]\
+ ' ' + raw_data_list_TWILIGHT[i][1] + '\n')
f.write(raw_data_list_TWILIGHT[choosen_illum][0]\
+ ' ' + raw_data_list_TWILIGHT[choosen_illum][1] + '\n')
if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO':
f.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') +' GEOMETRY_TABLE\n')
if self.mode == 'NFM-AO':
f.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') +' GEOMETRY_TABLE\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits') +' MASTER_BIAS\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_FLAT.fits') +' MASTER_FLAT\n')
if self.dark:
f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') +' MASTER_DARK\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits') +' TRACE_TABLE\n')
f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'WAVECAL_TABLE.fits') +' WAVECAL_TABLE\n')
if MJDsskyflat.all() < 57823.5:
f.write(os.path.join(self.static_calibration_dir, 'vignetting_mask.fits') +' VIGNETTING_MASK\n')
f.close()
if os.path.isfile(sof_file):
assert filecmp.cmp(sof_file, sof_file_temp), 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK'
os.remove(sof_file_temp)
else:
os.rename(sof_file_temp, sof_file)
if not self.debug:
_call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\
esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if not create_sof:
if not self.debug:
_call_esorex(self, self.calibration_dir + 'TWILIGHT', esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _science_pre(self, exp_list_SCI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_scibasic``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... Science PREPROCESSING')
esorex_cmd = '--log-file=sci_basic_object.log --log-level=debug'\
+ ' muse_scibasic'\
+ ' --nifu=-1'\
+ ' --resample'\
+ ' --saveimage=true'\
+ ' --skyreject=' + self.skyreject\
+ ' --skylines=' + self.skylines \
+ ' --merge'
sof = ' sci_basic_object.sof'
esorex_cmd_std = '--log-file=sci_basic_std.log --log-level=debug'\
+ ' muse_scibasic'\
+ ' --nifu=-1'\
+ ' --resample'\
+ ' --saveimage=true'\
+ ' --skyreject=15.,15.,1'\
+ ' --merge'
sof_std = ' sci_basic_std.sof'
sci_basic_std_sof = os.path.join(self.working_dir, 'std', 'sci_basic_std.sof')
sci_basic_std_sof_temp = os.path.join(self.working_dir, 'std', 'sci_basic_std_temp.sof')
if os.path.exists(sci_basic_std_sof):
os.remove(sci_basic_std_sof)
for exposure_ID in range(len(exp_list_SCI)):
print(' >>> processing exposure: '\
+ str(exposure_ID + 1) + '/' + str(len(exp_list_SCI)))
print(' >>> processing: ' + exp_list_SCI[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\
format='no_header')
exposure_dir = exp_list_SCI[exposure_ID][:-9]
MJDsillum = np.array([])
illum_index = np.array([])
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'OBJECT' or raw_data_list[i][1] == 'SKY':
objecthdu = fits.open(raw_data_list[i][0])
MJDobject = objecthdu[0].header['MJD-OBS']
if raw_data_list[i][1] == 'STD':
stdhdu = fits.open(raw_data_list[i][0])
MJDstd = stdhdu[0].header['MJD-OBS']
if raw_data_list[i][1] == 'ILLUM':
illumhdu = fits.open(raw_data_list[i][0])
MJDillum = illumhdu[0].header['MJD-OBS']
MJDsillum = np.append(MJDsillum, MJDillum)
illum_index = np.append(illum_index, i)
if len(MJDsillum) == 0:
print("WARNING: No ILLUM exposure was given !!!")
choosen_illum_object = None
else:
choosen_illum_object = int(illum_index[np.argmin(np.abs(MJDsillum\
- MJDobject))])
choosen_illum_std = int(illum_index[np.argmin(np.abs(MJDsillum\
- MJDstd))])
f_std = open(sci_basic_std_sof_temp, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'STD':
f_std.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
if choosen_illum_object:
f_std.write(raw_data_list[choosen_illum_std][0]\
+ ' ' + raw_data_list[choosen_illum_std][1] + '\n')
if not self.using_ESO_calibration:
f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f_std.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n')
f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f_std.write(os.path.join( self.calibration_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n')
if self.using_ESO_calibration:
f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f_std.write(os.path.join(self.ESO_calibration_dir, 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n')
f_std.write(os.path.join(self.ESO_calibration_dir, 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f_std.write(os.path.join(self.ESO_calibration_dir, 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_DARK.fits') + ' MASTER_DARK\n')
if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO':
f_std.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n')
if self.mode == 'NFM-AO':
f_std.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n')
f_std.write(os.path.join(self.static_calibration_dir, 'badpix_table.fits') + ' BADPIX_TABLE\n')
f_std.close()
if create_sof:
sci_basic_obj_sof = os.path.join(exposure_dir, 'sci_basic_object.sof')
sci_basic_obj_sof_temp = os.path.join(exposure_dir, 'sci_basic_object_temp.sof')
if os.path.exists(sci_basic_obj_sof):
os.remove(sci_basic_obj_sof)
f_object = open(sci_basic_obj_sof, 'w')
for i in range(len(raw_data_list)):
if raw_data_list[i][1] == 'OBJECT':
f_object.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
if raw_data_list[i][1] == 'SKY':
f_object.write(raw_data_list[i][0]\
+ ' ' + raw_data_list[i][1] + '\n')
if choosen_illum_object:
f_object.write(raw_data_list[choosen_illum_object][0]\
+ ' ' + raw_data_list[choosen_illum_object][1] + '\n')
if not self.using_ESO_calibration:
f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f_object.write(os.path.join(self.calibration_dir, 'TWILIGHT','TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n')
f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f_object.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n')
if self.using_ESO_calibration:
f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_BIAS.fits') + ' MASTER_BIAS\n')
f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_FLAT.fits') + ' MASTER_FLAT\n')
f_object.write(os.path.join(self.ESO_calibration_dir, 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n')
f_object.write(os.path.join(self.ESO_calibration_dir, 'TRACE_TABLE.fits') + ' TRACE_TABLE\n')
f_object.write(os.path.join(self.ESO_calibration_dir, 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n')
if self.dark:
f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_DARK.fits') + ' MASTER_DARK\n')
if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO':
f_object.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n')
if self.mode == 'NFM-AO':
f_object.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n')
f_object.write(os.path.join(self.static_calibration_dir, 'badpix_table.fits') + ' BADPIX_TABLE\n')
f_object.close()
if not self.debug:
if self.reduce_sci:
_call_esorex(self, exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
if os.path.isfile(sci_basic_std_sof):
assert filecmp.cmp(sci_basic_std_sof, sci_basic_std_sof_temp),\
'CAUTION DIFFERENT STD STARS FOR VARIOUS FIELDS: PLEASE CHECK'
os.remove(sci_basic_std_sof_temp)
else:
os.rename(sci_basic_std_sof_temp, sci_basic_std_sof)
if not self.debug:
if self.reduce_std:
_call_esorex(self, os.path.join(self.working_dir, 'std'), esorex_cmd_std, sof_std, esorex_kwargs=esorex_kwargs)
[docs]
def _std_flux(self, exp_list_SCI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_standard``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... FLUX CALIBRATION')
esorex_cmd = ' --log-file=std_flux.log --log-level=debug'\
+ ' muse_standard'\
+ ' --filter=white'
sof = ' std_flux.sof'
PIXTABLE_STD_list = _get_filelist(self, os.path.join(self.working_dir,'std'),\
'PIXTABLE_STD*.fits')
if create_sof:
if os.path.exists(os.path.join(self.working_dir,'std','std_flux.sof')):
os.remove(os.path.join(self.working_dir,'std','std_flux.sof'))
f = open(os.path.join(self.working_dir,'std','std_flux.sof'), 'w')
for i in range(len(PIXTABLE_STD_list)):
f.write(os.path.join(self.working_dir,'std',PIXTABLE_STD_list[i]\
+ ' PIXTABLE_STD') + '\n')
f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'std_flux_table.fits') + ' STD_FLUX_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n')
f.close()
if not self.debug:
_call_esorex(self, os.path.join(self.working_dir,'std'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _sky(self, exp_list_SCI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_sky``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... SKY CREATION')
sky = np.zeros_like(exp_list_SCI, dtype=bool)
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SKY': sky[idx] = True
if exposure[-8:-5] == 'SCI': sci[idx] = True
if self.skyfield == 'auto' and (sky == True).any():
exp_list_SCI_sky = np.array(exp_list_SCI)[sky]
else:
exp_list_SCI_sky = np.array(exp_list_SCI)
for exposure_ID in range(len(exp_list_SCI_sky)):
print(' >>> processing exposure: '\
+ str(exposure_ID + 1) + '/' + str(len(exp_list_SCI_sky)))
print(' >>> processing: ' + exp_list_SCI_sky[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI_sky[exposure_ID],\
format='no_header')
exposure_dir = exp_list_SCI_sky[exposure_ID][:-9] + '/'
if self.skyfield == 'auto' and (sky == True).any():
PIXTABLE_SKY_list =\
_get_filelist(self, exposure_dir, 'PIXTABLE_SKY*.fits')
else:
PIXTABLE_SKY_list =\
_get_filelist(self, exposure_dir, 'PIXTABLE_OBJECT*.fits')
if create_sof:
sky_sof = os.path.join(exposure_dir, 'sky.sof')
if os.path.exists(sky_sof):
os.remove(sky_sof)
f = open(sky_sof, 'w')
for i in range(len(PIXTABLE_SKY_list)):
f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n')
if not self.using_ESO_calibration:
f.write(os.path.join(self.calibration_dir,'SCIENCE','LSF_PROFILE.fits') + ' LSF_PROFILE\n')
if self.using_ESO_calibration:
f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n')
f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n')
f.close()
esorex_cmd = '--log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore)
sof = ' sky.sof'
if self.skyfield == 'auto' and (sky == True).any():
if not self.debug:
_call_esorex(self, exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
else:
if not self.debug:
_call_esorex(self, exposure_dir,\
'--log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore),\
sof, esorex_kwargs=esorex_kwargs)
if self.skyfield == 'auto' and (sky == True).any():
skydate = np.ones_like(exp_list_SCI_sky, dtype=float)
for idx, exps in enumerate(exp_list_SCI_sky):
skydate[idx] = fits.open(os.path.join(exps[:-9], 'PIXTABLE_SKY_0001-01.fits'))[0].header['MJD-OBS']
for idx, exps in enumerate(np.array(exp_list_SCI)[sci]):
scidate = fits.open(os.path.join(exps[:-9], 'PIXTABLE_OBJECT_0001-01.fits'))[0].header['MJD-OBS']
ind = np.argmin(abs(skydate - scidate))
flist = glob.glob(os.path.join(exp_list_SCI_sky[ind][:-9], 'SKY_*.fits'))
for f in flist:
shutil.copy(f, os.path.join(exps[:-9],'.'))
[docs]
def _modified_sky(self, exp_list_SCI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_sky`` with the modified continuum and
line subtraction as described in `Zeidler et al. 2019`_.
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... SKY CREATION MODIFIED')
sof = 'sky.sof'
sky = np.zeros_like(exp_list_SCI, dtype=bool)
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SKY':
sky[idx] = True
if exposure[-8:-5] == 'SCI':
sci[idx] = True
if self.skyfield == 'auto' and (sky == True).any():
exp_list_SCI_sky = np.array(exp_list_SCI)[sky]
else:
exp_list_SCI_sky = np.array(exp_list_SCI)
for exposure_ID in range(len(exp_list_SCI_sky)):
print(' >>> processing exposure: ' + str(exposure_ID + 1)\
+ '/' + str(len(exp_list_SCI_sky)))
print(' >>> processing: ' + exp_list_SCI_sky[exposure_ID])
print(' ')
raw_data_list = ascii.read(exp_list_SCI_sky[exposure_ID],\
format='no_header')
exposure_dir = exp_list_SCI_sky[exposure_ID][:-9]
if self.skyfield == 'auto' and (sky == True).any():
PIXTABLE_SKY_list = _get_filelist(self, exposure_dir, 'PIXTABLE_SKY*.fits')
else:
PIXTABLE_SKY_list = _get_filelist(self, exposure_dir, 'PIXTABLE_OBJECT*.fits')
if create_sof:
sky_sof = os.path.join(exposure_dir, 'sky.sof')
if os.path.exists(sky_sof):
os.remove(sky_sof)
f = open(sky_sof, 'w')
for i in range(len(PIXTABLE_SKY_list)):
f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n')
if not self.using_ESO_calibration:
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
if self.using_ESO_calibration:
f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n')
f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n')
f.close()
if self.skyfield == 'auto' and (sky == True).any():
if not self.debug:
_call_esorex(self, exposure_dir,\
'--log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore),\
sof , esorex_kwargs=esorex_kwargs)
else:
if not self.debug:
_call_esorex(self, exposure_dir,\
'--log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore),\
sof, esorex_kwargs=esorex_kwargs)
os.chdir(exposure_dir)
print('create temporary backup of the SKY_CONTINUUM.fits ==> SKY_CONTINUUM_temp.fits')
shutil.copy('SKY_CONTINUUM.fits', 'SKY_CONTINUUM_temp.fits')
sky_cont_hdu = fits.open('SKY_CONTINUUM.fits', checksum=True)
sky_cont = sky_cont_hdu[1].data
for i in range(len(sky_cont)):
sky_cont[i][1] = 0.
sky_cont_hdu[1].data = sky_cont
sky_cont_hdu.writeto('SKY_CONTINUUM_zero.fits',\
overwrite=True, checksum=True)
os.chdir(self.rootpath)
if create_sof:
sky_sof = os.path.join(exposure_dir, 'sky.sof')
if os.path.exists(sky_sof):
os.remove(sky_sof)
f = open(sky_sof, 'w')
for i in range(len(PIXTABLE_SKY_list)):
f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n')
if not self.using_ESO_calibration:
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
if self.using_ESO_calibration:
f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n')
if self.modified_continuum:
f.write(os.path.join(exposure_dir, 'SKY_CONTINUUM_zero.fits') + ' SKY_CONTINUUM\n')
else:
f.write(os.path.join(exposure_dir, 'SKY_CONTINUUM.fits') + ' SKY_CONTINUUM\n')
f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n')
f.close()
if self.skyfield == 'auto' and (sky == True).any():
if not self.debug:
_call_esorex(self, exposure_dir,\
'--log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore),\
sof, esorex_kwargs=esorex_kwargs)
else:
if not self.debug:
_call_esorex(self, exposure_dir,\
' --log-file=sky.log --log-level=debug'\
+ ' muse_create_sky'\
+ ' --fraction=' + str(self.skyfraction)\
+ ' --ignore=' + str(self.skyignore),\
sof, esorex_kwargs=esorex_kwargs)
os.chdir(exposure_dir)
if self.modified_continuum:
print('SKY_CONTINUUM_temp.fits ==> SKY_CONTINUUM.fits')
shutil.copy('SKY_CONTINUUM_temp.fits', 'SKY_CONTINUUM.fits')
os.remove('SKY_CONTINUUM_temp.fits')
hdu = fits.open('SKY_LINES.fits', checksum=True)
data = hdu[1].data
lines_to_keep = [i for i, sky_line in enumerate(data) \
if (sky_line[0][:2] == 'O2' or sky_line[0][:2] == 'OH')]
data = data[lines_to_keep]
hdu[1].data = data
hdu.writeto('SKY_LINES.fits', overwrite=True, checksum=True)
os.chdir(self.rootpath)
if self.skyfield == 'auto' and (sky == True).any():
skydate = np.ones_like(exp_list_SCI_sky, dtype=float)
for idx, exps in enumerate(exp_list_SCI_sky):
skydate[idx] = fits.open(os.path.join(exps[:-9], 'PIXTABLE_SKY_0001-01.fits'))[0].header['MJD-OBS']
for idx, exps in enumerate(np.array(exp_list_SCI)[sci]):
scidate = fits.open(os.path.join(exps[:-9], 'PIXTABLE_OBJECT_0001-01.fits'))[0].header['MJD-OBS']
ind = np.argmin(abs(skydate - scidate))
flist = glob.glob(os.path.join(exp_list_SCI_sky[ind][:-9], 'SKY_*.fits'))
for f in flist:
shutil.copy(f, os.path.join(exps[:-9], '.'))
[docs]
def _scipost(self, exp_list_SCI, create_sof, OB, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_scipost``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
OB : :obj:`str`:
The specific ``OB`` to be reduced.
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... SCIENCE POST-PROCESSING')
unique_pointings = np.array([])
unique_tester = ' '
sof = 'scipost.sof'
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SCI':
sci[idx] = True
exp_list_SCI = np.array(exp_list_SCI)[sci]
for expnum in range(len(exp_list_SCI)):
if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1:
unique_pointings = np.append(unique_pointings,\
exp_list_SCI[expnum][:-16])
unique_tester = unique_tester + exp_list_SCI[expnum][:-16]
for unique_pointing_num in range(len(unique_pointings)):
print(' ')
print(' >>> processing pointing: '
+ str(unique_pointing_num + 1) + '/' + str(len(unique_pointings)))
print(' ')
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
exp_list = glob.glob(sec + '*SCI.list')
for exp_num in range(len(exp_list)):
print(' ')
print(' >>> processing exposure: '\
+ str(exp_num + 1) + '/' + str(len(exp_list)))
print(' ')
PIXTABLE_OBJECT_list = _get_filelist(self, exp_list[exp_num][:-9], 'PIXTABLE_OBJECT*.fits')
if create_sof:
scipost_sof = os.path.join(exp_list[exp_num][:-9], 'scipost.sof')
if os.path.exists(scipost_sof):
os.remove(scipost_sof)
f = open(scipost_sof, 'w')
for i in range(len(PIXTABLE_OBJECT_list)):
f.write(os.path.join(exp_list[exp_num][:-9], PIXTABLE_OBJECT_list[i]) + ' PIXTABLE_OBJECT\n')
if not self.using_ESO_calibration:
f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
f.write(os.path.join(self.calibration_dir, 'ASTROMETRY_WCS_0001.fits') + ' ASTROMETRY_WCS\n')
if self.using_ESO_calibration:
f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n')
f.write(os.path.join(self.ESO_calibration_dir, 'ASTROMETRY_WCS.fits') + ' ASTROMETRY_WCS\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n')
f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n')
if self.skysub:
f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_LINES.fits') + ' SKY_LINES\n')
if self.modified_continuum:
f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_CONTINUUM_zero.fits') + ' SKY_CONTINUUM\n')
else:
f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_CONTINUUM.fits') + ' SKY_CONTINUUM\n')
# if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO':
# f.write(self.static_calibration_dir\
# + 'astrometry_wcs_wfm.fits ASTROMETRY_WCS\n')
# if self.mode == 'NFM-AO':
# f.write(self.static_calibration_dir\
# + 'astrometry_wcs_nfm.fits ASTROMETRY_WCS\n')
if not self.autocalib == 'none':
f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_MASK.fits') + ' SKY_MASK\n')
if self.autocalib == 'user':
f.write(os.path.join(exp_list[exp_num][:-9], 'AUTOCAL_FACTORS.fits') + ' AUTOCAL_FACTORS\n')
f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n')
f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n')
if self.raman:
f.write(os.path.join(self.static_calibration_dir, 'raman_lines.fits') + ' RAMAN_LINES\n')
f.close()
if self.skysub:
print('with sky subtraction...')
if not self.skysub:
print('without sky subtraction...')
if not self.debug:
_call_esorex(self, exp_list[exp_num][:-9],\
'--log-file=scipost.log --log-level=debug'\
+ ' muse_scipost'\
+ ' --save=cube,skymodel,individual,raman,autocal'\
+ ' --skymethod=' + self.skymethod\
+ ' --autocalib=' + str(self.autocalib).lower()\
+ ' --filter=white',\
sof, esorex_kwargs=esorex_kwargs)
[docs]
def _dither_collect(self, exp_list_SCI, OB):
'''
This module collects the individual dither exposures for one OB to be
combined in the steps: :mod:`MUSEreduce._exp_align` and
:mod:`MUSEreduce._exp_combine`
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
OB : :obj:`str`:
The specific ``OB`` to be reduced.
Kwargs:
'''
print('... COLLECT DITHER POSTITIONS')
unique_pointings = np.array([])
unique_tester = ' '
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SCI':
sci[idx] = True
exp_list_SCI = np.array(exp_list_SCI)[sci]
print(' ')
print(' >>> Copying files:')
print(' ')
if len(self.user_list) == 0:
for expnum in range(len(exp_list_SCI)):
if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1:
unique_pointings = np.append(unique_pointings,\
exp_list_SCI[expnum][:-16])
unique_tester = unique_tester + exp_list_SCI[expnum][:-16]
if len(self.user_list) == 1:
if self.user_list[0] == "all":
unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])]
if len(self.user_list) > 1:
unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])]
for unique_pointing_num in range(len(unique_pointings)):
if self.dithering_multiple_OBs:
if self.multOB_exp_counter == 0:
self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
unique_pointings_ID = self.multOB_unique_pointings_ID
else:
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
if len(self.user_list) == 0:
exp_list = glob.glob(os.path.join(sec, '*SCI.list'))
if len(self.user_list) == 1:
if self.user_list[0] == 'all':
exp_list = exp_list_SCI
if len(self.user_list) > 1:
exp_list = []
for user_list_element in self.user_list:
exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list'))
if self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID)
if not self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(sec)
if not os.path.exists(combining_exposure_dir):
os.makedirs(combining_exposure_dir)
files = glob.glob(os.path.join(combining_exposure_dir, '*FOV_0001*'))
if len(files) > 0:
for f in files:
os.remove(f)
for unique_pointing_num in range(len(unique_pointings)):
if self.dithering_multiple_OBs:
unique_pointings_ID = self.multOB_unique_pointings_ID
else:
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
if len(self.user_list) == 0:
exp_list = glob.glob(sec + '*SCI.list')
if len(self.user_list) == 1:
if self.user_list[0] == 'all':
exp_list = exp_list_SCI
if len(self.user_list) > 1:
exp_list = []
for user_list_element in self.user_list:
exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list'))
if self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID)
if not self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(sec)
ident_pos = np.zeros(len(exp_list))
for idx in range(len(exp_list)):
ident = 0
for idx2 in range(len(exp_list)):
if exp_list[idx][-20:-12] == exp_list[idx2][-20:-12]:
ident_pos[idx2] = ident
ident += 1
for exp_num in range(len(exp_list)):
origin_files = [os.path.join(exp_list[exp_num][:-9], 'DATACUBE_FINAL.fits'),
os.path.join(exp_list[exp_num][:-9], 'IMAGE_FOV_0001.fits'),
os.path.join(exp_list[exp_num][:-9], 'PIXTABLE_REDUCED_0001.fits')
]
destination_files = [os.path.join(combining_exposure_dir, 'DATACUBE_FINAL_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits'),
os.path.join(combining_exposure_dir, 'IMAGE_FOV_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits'),
os.path.join(combining_exposure_dir, 'PIXTABLE_REDUCED_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits')
]
for (origin_file, destination_file) in zip(origin_files, destination_files):
print(origin_file + ' ==> ' + destination_file)
shutil.copy(origin_file, destination_file)
[docs]
def _exp_align(self, exp_list_SCI, create_sof, OB, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_exp_align``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
OB : :obj:`str`:
The specific ``OB`` to be reduced.
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... CUBE ALIGNMENT')
esorex_cmd = '--log-file=exp_align.log --log-level=debug'\
+ ' muse_exp_align'\
+ ' --srcmin='+str(self.config['exp_align']['srcmin'])\
+ ' --srcmax='+str(self.config['exp_align']['srcmax'])
sof = ' exp_align.sof'
unique_pointings = np.array([])
unique_tester = ' '
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SCI':
sci[idx] = True
exp_list_SCI = np.array(exp_list_SCI)[sci]
if len(self.user_list) == 0:
for expnum in range(len(exp_list_SCI)):
if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1:
unique_pointings = np.append(unique_pointings,\
exp_list_SCI[expnum][:-16])
unique_tester = unique_tester + exp_list_SCI[expnum][:-16]
if len(self.user_list) == 1:
if self.user_list[0] == "all":
unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])]
if len(self.user_list) > 1:
unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])]
for unique_pointing_num in range(len(unique_pointings)):
for unique_pointing_num in range(len(unique_pointings)):
if self.dithering_multiple_OBs:
if self.multOB_exp_counter == 0:
self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
unique_pointings_ID = self.multOB_unique_pointings_ID
else:
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
if len(self.user_list) == 0:
exp_list = glob.glob(os.path.join(sec, '*SCI.list'))
if len(self.user_list) == 1:
if self.user_list[0] == 'all':
exp_list = exp_list_SCI
if len(self.user_list) > 1:
exp_list = []
for user_list_element in self.user_list:
exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list'))
if self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID)
if not self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(sec)
for unique_pointing_num in range(len(unique_pointings)):
print(' ')
print(' >>> processing pointing: ' + str(unique_pointing_num + 1)\
+ '/' + str(len(unique_pointings)))
print(' ')
for unique_pointing_num in range(len(unique_pointings)):
if self.dithering_multiple_OBs:
unique_pointings_ID = self.multOB_unique_pointings_ID
else:
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
if self.dithering_multiple_OBs:
print(unique_pointings_ID)
combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID)
if not self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(sec)
exp_list = _get_filelist(self, combining_exposure_dir, 'IMAGE_FOV_*.fits')
if create_sof:
sof_file = os.path.join(combining_exposure_dir, 'exp_align.sof')
if os.path.exists(sof_file):
os.remove(sof_file)
f = open(sof_file, 'w')
for i in range(len(exp_list)):
f.write(os.path.join(combining_exposure_dir, exp_list[i]) + ' IMAGE_FOV\n')
f.close()
if not self.debug:
_call_esorex(self, combining_exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _exp_combine(self, exp_list_SCI, create_sof, esorex_kwargs=None):
'''
This module calls *ESORex'* ``muse_exp_combine``
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
create_sof : :obj:`bool`:
:obj:`True`: ``bias.sof`` is created and populated
:obj:`False`: ``bias.sof`` is not created and needs to be provided
by the user
Kwargs:
esorex_kwargs: :obj:`str`
Additional keywords that should be passed for special processing. These should be passed
as one string
'''
print('... EXPOSURE COMBINATION')
esorex_cmd = '--log-file=exp_combine.log --log-level=debug'\
+ ' muse_exp_combine'\
+ ' --filter=white'\
+ ' --save=cube'\
+ ' --crsigma=5.'\
+ ' --weight=' + str(self.weight)
sof = ' exp_combine.sof'
unique_pointings = np.array([])
unique_tester = ' '
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SCI':
sci[idx] = True
exp_list_SCI = np.array(exp_list_SCI)[sci]
if len(self.user_list) == 0:
for expnum in range(len(exp_list_SCI)):
if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1:
unique_pointings = np.append(unique_pointings,\
exp_list_SCI[expnum][:-16])
unique_tester = unique_tester + exp_list_SCI[expnum][:-16]
if len(self.user_list) == 1:
if self.user_list[0] == "all":
unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])]
if len(self.user_list) > 1:
unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])]
for unique_pointing_num in range(len(unique_pointings)):
print(' ')
print(' >>> processing pointing: ' + str(unique_pointing_num + 1)\
+ '/' + str(len(unique_pointings)))
print(' ')
for unique_pointing_num in range(len(unique_pointings)):
if self.dithering_multiple_OBs:
if self.multOB_exp_counter == 0:
self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
unique_pointings_ID = self.multOB_unique_pointings_ID
else:
unique_pointings_ID = unique_pointings[unique_pointing_num][-18:]
sec = unique_pointings[unique_pointing_num]
if self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID)
if not self.dithering_multiple_OBs:
combining_exposure_dir = os.path.join(sec)
pixtable_list = _get_filelist(self, combining_exposure_dir, 'PIXTABLE_REDUCED_*.fits')
if create_sof:
sof_file = os.path.join(combining_exposure_dir, 'exp_combine.sof')
if os.path.exists(sof_file):
os.remove(sof_file)
f = open(sof_file, 'w')
for i in range(len(pixtable_list)):
f.write(os.path.join(combining_exposure_dir, pixtable_list[i]) + ' PIXTABLE_REDUCED\n')
f.write(os.path.join(combining_exposure_dir, 'OFFSET_LIST.fits') + ' OFFSET_LIST\n')
f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n')
f.close()
if not self.debug:
_call_esorex(self, combining_exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs]
def _cleanup(self, exp_list_SCI, include_std=False, include_combined=False, include_calibrations=False):
'''
This module calls cleans up intermediate products to save disk space
Args:
exp_list_SCI : :obj:`list`
The list of all associated science files including their category
read from the fits header
Kwargs:
include_std: :obj:`bool`
If :obj:`true` the standard star folder is also emptied
include_combined: :obj:`bool`
If :obj:`true` the combined cube folder is also emptied
'''
print('... CLEANING UP THE FOLDERS')
unique_pointings = np.array([])
unique_tester = ' '
sci = np.zeros_like(exp_list_SCI, dtype=bool)
for idx, exposure in enumerate(exp_list_SCI):
if exposure[-8:-5] == 'SCI':
sci[idx] = True
exp_list_SCI = np.array(exp_list_SCI)[sci]
if len(self.user_list) == 0:
for expnum in range(len(exp_list_SCI)):
if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1:
unique_pointings = np.append(unique_pointings,\
exp_list_SCI[expnum][:-16])
unique_tester = unique_tester + exp_list_SCI[expnum][:-16]
if len(self.user_list) == 1:
if self.user_list[0] == "all":
unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])]
if len(self.user_list) > 1:
unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])]
if include_combined:
for unique_pointing in unique_pointings:
print("Removing: ", unique_pointing)
shutil.rmtree(unique_pointing)
if include_std:
std_star_dir = os.path.join(self.working_dir, 'std')
print("Removing: ", std_star_dir)
if os.path.exists(std_star_dir):
shutil.rmtree(std_star_dir)
if include_calibrations:
eso_calib_dir = os.path.join(self.working_dir, 'ESO_calibrations')
calib_dir = os.path.join(self.working_dir, 'calibrations')
static_calib_dir = os.path.join(self.working_dir, 'static_calibration_files')
print("Removing: ", eso_calib_dir)
if os.path.exists(eso_calib_dir):
shutil.rmtree(eso_calib_dir)
print("Removing: ", calib_dir)
if os.path.exists(calib_dir):
shutil.rmtree(calib_dir)
print("Removing: ", static_calib_dir)
if os.path.exists(static_calib_dir):
shutil.rmtree(static_calib_dir)
for exp in exp_list_SCI:
print("Removing: ", exp[:-9])
if os.path.exists(exp[:-9]):
shutil.rmtree(exp[:-9])