Source code for MUSEreduce

#!/usr/bin/env python

__version__ = '2.0.4'

__revision__ = '20260210'

import sys
import shutil
import os
import glob
import string
import filecmp
import numpy as np
from astropy.io import ascii, fits
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.utils import xml
from datetime import datetime, timedelta
import time
import json


[docs] class musereduce: ''' Kwargs: configfile : :obj:`str` A :obj:`json` configfile for musereduce, where all the parameters are set. debug : :obj:`bool`, (optional), default: :obj:`None` :obj:`True`: :class:`MUSEreduce.musereduce` runs in debug mode and ESORex will not be executed. All products must be available. It can be used for testing the creation of folder, and creating the ``.sof`` files ''' def __init__(self, configfile, debug=False): # Reading in the configuration file if configfile == None: configfile = os.path.join(os.path.dirname(__file__),"config.json") with open(configfile, "r") as read_file: self.config = json.load(read_file) # setting all the parameters from teh configuration file self.OB_list = np.array(self.config['global']['OB_list']) self.dithering_multiple_OBs = self.config['global']['dither_multiple_OBs'] self.rootpath = self.config['global']['rootpath'] self.mode = self.config['global']['mode'] self.auto_sort_data = self.config['global']['auto_sort_data'] self.auto_create_OB_folders = self.config['global']['auto_create_OB_folders'] self.using_specific_exposure_time = self.config['global']['using_specific_exposure_time'] self.n_CPU = self.config['global']['n_CPU'] self.using_ESO_calibration = self.config['global']['using_ESO_calibration'] self.dark = self.config['calibration']['dark'] self.renew_statics = self.config['global']['renew_statics'] self.skyreject = self.config['sci_basic']['skyreject'] if not 'skylines' in self.config['sci_basic']: self.skylines = '5577.339,6300.304' else: self.skylines = self.config['sci_basic']['skylines'] if not 'execute_std' in self.config['sci_basic']: self.reduce_std = True else: self.reduce_std = self.config['sci_basic']['execute_std'] if not 'execute_sci' in self.config['sci_basic']: self.reduce_sci = True else: self.reduce_sci = self.config['sci_basic']['execute_sci'] self.skyfield = self.config['sky']['sky_field'] self.skyfraction = self.config['sky']['fraction'] self.skyignore = self.config['sky']['ignore'] self.skymethod = self.config['sky']['method'] self.skysub = self.config['sci_post']['subtract_sky'] self.autocalib = self.config['sci_post']['autocalib'] self.raman = self.config['sci_post']['raman'] if self.mode != 'NFM-AO': self.raman = False self.weight = self.config['exp_combine']['weight'] self.user_list = np.array(self.config['global']['exp_list'], dtype=object) self.raw_data_dir = os.path.join(self.rootpath, 'raw/') self.reduced_dir = os.path.join(self.rootpath, 'reduced/') self.working_dir = self.reduced_dir if not os.path.exists(self.working_dir): os.mkdir(self.working_dir) self.combining_OBs_dir = None self.calibration_dir = None self.ESO_calibration_dir = None self.static_calibration_dir = None self.dithername = None self.debug = debug
[docs] def execute(self): ''' This method executes wrapper and starts the data reduction process set in the :obj:`json` config file. ''' startime = time.time() print('') print(' ') print('##############################################################') print('##### #####') print('##### MUSE data reduction pipeline wrapper #####') print('##### Must be used with ESORex and ESO MUSE pipeline #####') print('##### author: Peter Zeidler (zeidler@stsci.edu) #####') print('##### Feb 11, 2026 #####') print('##### Version: '+str(__version__)+' \ #####') print('##### #####') print('##############################################################') print('') print('') print('... Checking various necessary variables') assert sys.version_info > (3, 0), 'YOU ARE NOT USING PYTHON 3.x' assert self.config['global']['pipeline_path'], 'NO PIPELINE PATH DEFINED' assert self.config['global']['rootpath'], 'NO ROOTPATH DEFINED' if self.config['sky']['modified'] == True: self.skymethod = 'subtract-model' print("") print("WARNING: modified sky is used") print("... setting the skymethod to: ",self.skymethod) print("") try: self.modified_continuum = self.config['sky']['modified_continuum'] print("... setting the modified continuum to: ", self.modified_continuum) except KeyError: print("") print("WARNING: modified_continuum not provided in config file !!!") print("... setting the modified continuum to default [True]") print("") self.modified_continuum = True self.static_calib_path = self.config.get('global', {}).get('static_calib_path', None) if not self.static_calib_path: self.static_calib_path = os.path.join(self.config['global']['pipeline_path'], 'calib/muse*/') print('No specific static calibration path defined') print(' >>> Static Calibration path: ' + self.static_calib_path) print('') print('... Perfect, everything checks out') print('') if self.config['cleanup']['execute']: print('... Will cleanup folders first before starting other modules') print('') print('... Settings for data reduction') if self.debug: print('##################################################') print('##### RUNNING IN DEBUG MODE #####') print('##### PRODUCTS MUST BE ALL THERE #####') print('##### NO ESORex EXECUTION #####') print('##################################################') print("") print(' >>> Number of cores: ' + str(self.n_CPU) + ' cores') print(' >>> Observation mode: ' + self.mode) if self.config['calibration']['execute'] == True: if self.using_ESO_calibration: print(' >>> Using ESO calibration files') if self.config['calibration']['esorex_kwargs_bias'] or self.config['calibration']['esorex_kwargs_dark'] or\ self.config['calibration']['esorex_kwargs_flat'] or self.config['calibration']['esorex_kwargs_wavecal'] or\ self.config['calibration']['esorex_kwargs_lsf'] or self.config['calibration']['esorex_kwargs_twilight']: print('WARNING: KWARGS ARE SET BUT ESO CALIBRATIONS ARE USED') else: print(' >>> Using self-processed calibration files') if self.dark: print(' >>> DARK will be reduced and used') if not self.dark: print(' >>> DARK will not be reduced and used') if self.dark: print(' >>> DARK will be used: ' + str(self.dark)) if self.raman: print(' >>> Removal of Raman lines: ' + str(self.raman)) if self.dithering_multiple_OBs: print(' >>> Exposures per pointing are spread over multiple OBs') self.multOB_exp_counter = 0 self.multOB_unique_pointings_ID = None else: print(' >>> All exposures per pointing are located in one OB') if len(self.user_list) > 1: print(' >>> Dithering exposures input list:') for li in self.user_list: print(li) if len(self.user_list) == 1: if self.user_list[0] == 'all': print(' >>> Dithering all exposures within OB folder:') print('') print('... The following modules will be executed') if self.config['calibration']['execute'] and not self.using_ESO_calibration: print(' >>> BIAS') if self.dark: print(' >>> DARK') print(' >>> FLAT') print(' >>> WAVECAL') print(' >>> LSF') print(' >>> TWILIGHT') if self.config['sci_basic']['execute']: print(' >>> SCIBASIC') if not self.reduce_sci: print(' Caution SCIBASIC will not be executed on SCI exposure!!') if not self.reduce_std: print(' Caution SCIBASIC will not be executed on STD star!!') if self.config['std_flux']['execute']: print(' >>> STANDARD') if self.config['sky']['execute']: print(' >>> CREATE_SKY') if self.config['sky']['modified']: print(' ... modified execution') if self.config['sci_post']['execute']: print(' >>> SCI_POST') if self.config['exp_align']['execute']: print(' >>> EXP_ALIGN') if self.config['exp_combine']['execute']: print(' >>> EXP_COMBINE') print(' ') print('') print('##############################################################') print('##### All parameters set: Starting the data reduction #####') print('##############################################################') print('') print(' >>> Creating OB list') _create_ob_folders(self) if self.dithering_multiple_OBs: self.dithername = self.OB_list[0] print('==> The pointing name is: ' + self.dithername) print(' ') print('... Creating directories') for OB in self.OB_list: if self.dithering_multiple_OBs: self.working_dir = os.path.join(self.rootpath, 'reduced', OB) self.combining_OBs_dir = os.path.join(self.rootpath, 'reduced', self.dithername) if not os.path.exists(self.combining_OBs_dir): os.mkdir(self.combining_OBs_dir) else: self.working_dir = os.path.join(self.rootpath, 'reduced', OB) self.combining_OBs_dir = None self.calibration_dir = os.path.join(self.working_dir, 'calibrations') self.ESO_calibration_dir = os.path.join(self.working_dir, 'ESO_calibrations') self.static_calibration_dir = os.path.join(self.working_dir, 'static_calibration_files') if self.renew_statics and os.path.exists(self.static_calibration_dir): shutil.rmtree(self.static_calibration_dir) if self.renew_statics and self.auto_sort_data and os.path.exists(self.ESO_calibration_dir): shutil.rmtree(self.ESO_calibration_dir) if not os.path.exists(self.working_dir): os.mkdir(self.working_dir) if not os.path.exists(os.path.join(self.working_dir, 'std')): os.mkdir(os.path.join(self.working_dir, 'std')) if not os.path.exists(self.calibration_dir): os.mkdir(self.calibration_dir) if not os.path.exists(self.ESO_calibration_dir): os.mkdir(self.ESO_calibration_dir) if not os.path.exists(os.path.join(self.calibration_dir, 'DARK')) and self.dark: os.mkdir(os.path.join(self.calibration_dir, 'DARK')) if not os.path.exists(os.path.join(self.calibration_dir, 'TWILIGHT')): os.mkdir(os.path.join(self.calibration_dir, 'TWILIGHT')) if not os.path.exists(os.path.join(self.calibration_dir, 'SCIENCE')): os.mkdir(os.path.join(self.calibration_dir, 'SCIENCE')) if os.path.exists(self.static_calibration_dir): shutil.rmtree(self.static_calibration_dir) os.mkdir(self.static_calibration_dir) for itername in glob.glob(os.path.join(self.static_calib_path, '*.*')): shutil.copy(itername, os.path.join(self.static_calibration_dir, '.')) print('') print('... Sorting the data') if self.auto_sort_data: print(' >>> Sorting the raw data') _sort_data(self) else: print(' >>> raw data will not be sorted') print(' >>> MANUAL INTERACTION MAY BE NEEDED') for OB in self.OB_list: self.working_dir = os.path.join(self.reduced_dir, OB) if not self.using_specific_exposure_time: exp_list_SCI =\ np.concatenate([glob.glob(os.path.join(self.working_dir, '*_SCI.list')),\ glob.glob(os.path.join(self.working_dir, '*_SKY.list'))]) exp_list_SCI = sorted(exp_list_SCI) exp_list_DAR =\ sorted(glob.glob(os.path.join(self.working_dir, '*_DAR.list'))) exp_list_TWI =\ sorted(glob.glob(os.path.join(self.working_dir, '*_TWI.list'))) if self.using_specific_exposure_time: exp_list_SCI =\ sorted(np.concatenate([glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_SCI.list')), glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_SKY.list'))])) exp_list_DAR =\ sorted(glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_DAR.list'))) exp_list_TWI =\ sorted(glob.glob(os.path.join(self.working_dir, '*' + str('{:04d}'.format(self.using_specific_exposure_time)) + '*_TWI.list'))) for exposure in exp_list_SCI: exposure_dir = os.path.join(exposure[:-9]) if not os.path.exists(exposure_dir): os.mkdir(exposure_dir) if self.config['cleanup']['execute']: print(' ') print('... cleaning') _cleanup(self, exp_list_SCI, include_std=self.config['cleanup']['include_std'], include_combined=self.config['cleanup']['include_combined'], include_calibrations=self.config['cleanup']['include_calibrations']) print(' ') print('... reducing OB: ' + OB) print(' ') ### CALIBRATION PRE-PROCESSING ### if self.config['calibration']['execute']: create_sof = self.config['calibration']['create_sof'] if not self.using_ESO_calibration: _bias(self, exp_list_SCI, exp_list_DAR,\ exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_bias']) if self.dark: _dark(self, exp_list_SCI, exp_list_DAR, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_dark']) _flat(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_flat']) _wavecal(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_wavecal']) _lsf(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_lsf']) _twilight(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=self.config['calibration']['esorex_kwargs_twilight']) ### OBSERVATION PRE-PROCESSING ### if self.config['sci_basic']['execute']: create_sof = self.config['sci_basic']['create_sof'] _science_pre(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sci_basic']['esorex_kwargs']) ### OBSERVATION POST-PROCESSING ### if self.config['std_flux']['execute']: create_sof = self.config['std_flux']['create_sof'] _std_flux(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['std_flux']['esorex_kwargs']) if self.config['sky']['execute']: create_sof = self.config['sky']['create_sof'] if not self.config['sky']['modified']: _sky(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sky']['esorex_kwargs']) if self.config['sky']['modified']: _modified_sky(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['sky']['esorex_kwargs']) ###s SCIENCE POST-PROCESSING ### if self.config['sci_post']['execute']: create_sof = self.config['sci_post']['create_sof'] _scipost(self, exp_list_SCI, create_sof, OB, esorex_kwargs=self.config['sci_post']['esorex_kwargs']) if self.config['dither_collect']['execute']: _dither_collect(self, exp_list_SCI, OB) if self.dithering_multiple_OBs: if self.multOB_exp_counter == 0: if self.config['exp_align']['execute']: create_sof = self.config['exp_align']['create_sof'] _exp_align(self, exp_list_SCI, create_sof, OB, esorex_kwargs=self.config['exp_align']['esorex_kwargs']) if self.config['exp_combine']['execute']: create_sof = self.config['exp_combine']['create_sof'] _exp_combine(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['exp_combine']['esorex_kwargs']) else: if self.config['exp_align']['execute']: create_sof = self.config['exp_align']['create_sof'] _exp_align(self, exp_list_SCI, create_sof, OB, esorex_kwargs=self.config['exp_align']['esorex_kwargs']) if self.config['exp_combine']['execute']: create_sof = self.config['exp_combine']['create_sof'] _exp_combine(self, exp_list_SCI, create_sof, esorex_kwargs=self.config['exp_combine']['esorex_kwargs']) if self.dithering_multiple_OBs: self.multOB_exp_counter += 1 endtime = time.time() print(' >>> Total execution time: ',\ timedelta(seconds=endtime - startime))
[docs] def _get_filelist(self, data_dir, filename_wildcard): ''' This module collects the necessary file lists from folders. Args: data_dir : :obj:`str` The directory where the files are located. filename_wildcard : :obj:`str` The filenames which should be collected ''' os.chdir(data_dir) raw_data_list = glob.glob(filename_wildcard) os.chdir(self.rootpath) return raw_data_list
[docs] def _call_esorex(self, exec_dir, esorex_cmd, sof, esorex_kwargs=None): ''' This module calls the various ESOrex commands and gives it to the terminal for execution. Args: exec_dir : :obj:`str` The directory where ESOrex should be executed. esorex_cmd : :obj:`str` The ESOrex command that needs to be executed sof : :obj:`str` The name of the sof file needed for executing ESOrex Kwargs: esorex_kwargs : :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' os.chdir(exec_dir) os.system('export OMP_NUM_THREADS=' + str(self.n_CPU)) if esorex_kwargs: print('esorex ' + esorex_cmd + ' ' + esorex_kwargs + ' ' + sof) os.system('esorex ' + esorex_cmd + ' ' + esorex_kwargs + ' ' + sof) else: print('esorex ' + esorex_cmd + ' ' + sof) os.system('esorex ' + esorex_cmd + ' ' + sof) os.chdir(self.rootpath)
[docs] def _create_ob_folders(self): ''' This module creates the neccessary OB folders, from the raw data ''' if self.config['global']['OB_list']: print(' >>> Input OB list is used') self.OB_list = self.config['global']['OB_list'] else: print(' >>> The OB list is automatically created from the input raw folder') file_list = _get_filelist(self, self.raw_data_dir, '*.fits*') obs_name = np.array([]) for files in file_list: hdu = fits.open(os.path.join(self.raw_data_dir, files)) dprcatg_exist = hdu[0].header.get('HIERARCH ESO DPR CATG', False) if dprcatg_exist: dprcatg = (hdu[0].header['HIERARCH ESO DPR CATG']) if dprcatg == 'SCIENCE': obs_name = np.append(obs_name, hdu[0].header['HIERARCH ESO OBS NAME']) self.OB_list = np.sort(np.unique(obs_name)) print(" >>> The OB list:") for OBs in self.OB_list: print(' ', OBs) if self.dithering_multiple_OBs and len(self.user_list) > 0 and self.user_list != "all": print('Currently a user list cannot be provided with multiple OBs !!!') sys.exit() if self.config['global']['auto_create_OB_folders']: print(' >>> The OB folders are automatically created from the input OB list.') for unique_ob in self.config['global']['OB_list']: if os.path.exists(os.path.join(self.working_dir, unique_ob)): print(" >>> OB folder ", unique_ob, " already exists. Nothing to do here!") # shutil.rmtree(os.path.join(self.working_dir, unique_ob)) else: print("Creating OB folder: ", unique_ob) os.mkdir(os.path.join(self.working_dir, unique_ob))
[docs] def _sort_data(self): ''' This module reads the headers of all of the raw data and sorts it accordingly. It also assigns the correct calibration files to the exposures ''' file_list = _get_filelist(self, self.raw_data_dir, '*.fits*') science_files = np.array([]) calibration_files = np.array([]) science_type = np.array([]) calibration_type = np.array([]) ESO_calibration_files = np.array([]) ESO_calibration_type = np.array([]) cal_categories = ['MASTER_BIAS', 'MASTER_DARK', 'MASTER_FLAT',\ 'TRACE_TABLE', 'WAVECAL_TABLE', 'LSF_PROFILE', 'TWILIGHT_CUBE',\ 'FILTER_LIST', 'EXTINCT_TABLE', 'STD_FLUX_TABLE', 'SKY_LINES',\ 'GEOMETRY_TABLE', 'ASTROMETRY_WCS', 'STD_RESPONSE', 'STD_TELLURIC',\ 'ASTROMETRY_REFERENCE'] for files in file_list: hdu = fits.open(os.path.join(self.raw_data_dir, files)) dprcatg_exist = hdu[0].header.get('HIERARCH ESO DPR CATG', False) procatg_exist = hdu[0].header.get('HIERARCH ESO PRO CATG', False) if dprcatg_exist: dprcatg = (hdu[0].header['HIERARCH ESO DPR CATG']) dprtype = (hdu[0].header['HIERARCH ESO DPR TYPE']) if dprcatg == 'SCIENCE': science_files = np.append(science_files, files) science_type = np.append(science_type, dprtype) if dprcatg == 'CALIB': calibration_files = np.append(calibration_files, files) if dprtype == 'FLAT,LAMP': calibration_type = np.append(calibration_type, 'FLAT') elif dprtype == 'FLAT,SKY': calibration_type = np.append(calibration_type, 'SKYFLAT') elif dprtype == 'WAVE': calibration_type = np.append(calibration_type, 'ARC') elif dprtype == 'WAVE,MASK': calibration_type = np.append(calibration_type, 'MASK') elif dprtype == 'FLAT,LAMP,ILLUM': calibration_type = np.append(calibration_type, 'ILLUM') else: calibration_type = np.append(calibration_type, dprtype) if procatg_exist: procatg = (hdu[0].header['HIERARCH ESO PRO CATG']) ESO_calibration_files = np.append(ESO_calibration_files, files) ESO_calibration_type = np.append(ESO_calibration_type, procatg) rot_angles = np.zeros(len(science_files)) points = np.zeros(len(science_files), dtype=object) rot_angles_ident = np.zeros(len(science_files)) OB_ids = np.zeros(len(science_files), dtype=object) for sci_file_idx in range(len(science_files)): hdu = fits.open(os.path.join(self.raw_data_dir, science_files[sci_file_idx]))[0] OB_ids[sci_file_idx] = hdu.header['HIERARCH ESO OBS NAME'] RA = hdu.header['RA'] DEC = hdu.header['DEC'] EXPTIME = hdu.header['EXPTIME'] points[sci_file_idx] = SkyCoord(ra=RA * u.degree, dec=DEC * u.degree,\ frame='fk5').to_string('hmsdms', sep='',\ precision=0).translate(str.maketrans('', '', string.whitespace))\ + '_' + str(int(EXPTIME)).rjust(4, '0') rot_angles[sci_file_idx] = hdu.header['HIERARCH ESO INS DROT POSANG'] for idx in range(len(rot_angles)): ident = 0 for idx2 in range(len(rot_angles)): if rot_angles[idx] == rot_angles[idx2]\ and points[idx] == points[idx2]: rot_angles_ident[idx2] = ident ident += 1 xmlraw_superstring_dict = {} xmlraw2raw_filelist = np.sort(glob.glob(os.path.join(self.raw_data_dir,'*_raw2raw.xml'))) xmlraw2master_filelist = np.sort(glob.glob(os.path.join(self.raw_data_dir,'*_raw2master.xml'))) for xmlraw2raw_file, xmlraw2master_file in zip(xmlraw2raw_filelist, xmlraw2master_filelist): xmlraw2raw = xml.iterparser.xml_readlines(xmlraw2raw_file) xmlraw2master = xml.iterparser.xml_readlines(xmlraw2master_file) xmlraw_superstring_dict[xmlraw2raw_file.split('/')[-1]] = '\t'.join(xmlraw2raw) + '\t'.join(xmlraw2master) for sci_file_idx in range(len(science_files)): hdu = fits.open(os.path.join(self.raw_data_dir, science_files[sci_file_idx]))[0] RA = hdu.header['RA'] DEC = hdu.header['DEC'] EXPTIME = hdu.header['EXPTIME'] ROT = hdu.header['HIERARCH ESO INS DROT POSANG'] DATE = hdu.header['MJD-OBS'] # xmlraw2raw = xml.iterparser.xml_readlines(os.path.join(self.raw_data_dir,science_files[sci_file_idx][:-8] + '_raw2raw.xml')) # xmlraw2master = xml.iterparser.xml_readlines(os.path.join(self.raw_data_dir,science_files[sci_file_idx][:-8] + '_raw2master.xml')) for xml_key, xml_item in xmlraw_superstring_dict.items(): if science_files[sci_file_idx][:-8] in xml_item: xmlraw_superstring = xml_item # xmlraw2raw_string = '\t'.join(xmlraw2raw) # xmlraw2master_string = '\t'.join(xmlraw2master) # # xmlraw_superstring = xmlraw2raw_string + xmlraw2master_string working_dir_temp = os.path.join(self.reduced_dir, OB_ids[sci_file_idx]) c = SkyCoord(ra=RA * u.degree, dec=DEC * u.degree,\ frame='fk5').to_string('hmsdms', sep='',\ precision=0).translate(str.maketrans('', '', string.whitespace)) filelist_science = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\ + str(int(ROT)).rjust(3, '0') + '_'\ + str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0')\ + '_SCI.list' filelist_sky = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\ + str(int(ROT)).rjust(3, '0') + '_'\ + str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_SKY.list' filelist_dark = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\ + str(int(ROT)).rjust(3, '0') + '_'\ + str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_DAR.list' filelist_twilight = c + '_' + str(int(EXPTIME)).rjust(4, '0') + '_'\ + str(int(ROT)).rjust(3, '0') + '_'\ + str(int(rot_angles_ident[sci_file_idx])).rjust(2, '0') + '_TWI.list' dark_date = np.array([]) twilight_date = np.array([]) for calibfiles in range(len(calibration_files)): if calibration_type[calibfiles] == 'DARK': hdu_temp = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0] temp_date = hdu_temp.header['MJD-OBS'] dark_date = np.append(dark_date, temp_date) if calibration_type[calibfiles] == 'SKYFLAT': hdu_temp = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0] temp_date = hdu_temp.header['MJD-OBS'] twilight_date = np.append(twilight_date, temp_date) dark_date = np.unique(dark_date) twilight_date = np.unique(twilight_date) if science_type[sci_file_idx] == 'OBJECT': f_science = open(os.path.join(working_dir_temp, filelist_science), 'w') if science_type[sci_file_idx] == 'SKY': f_science = open(os.path.join(working_dir_temp, filelist_sky), 'w') f_dark = open(os.path.join(working_dir_temp, filelist_dark), 'w') f_twilight = open(os.path.join(working_dir_temp, filelist_twilight), 'w') f_science.write(os.path.join(self.raw_data_dir, science_files[sci_file_idx])\ + ' ' + science_type[sci_file_idx] + '\n') ### Sorting the ESO calibration files ESO_calibration_dir_temp = os.path.join(working_dir_temp, "ESO_calibrations") if len(ESO_calibration_files) > 0: for ifile, ESO_calibration_file in enumerate(ESO_calibration_files): ESO_calibration_filename = ESO_calibration_file.split("/")[-1][:-8] if ESO_calibration_filename in xmlraw_superstring and not os.path.isfile(os.path.join(ESO_calibration_dir_temp, ESO_calibration_type[ifile] + '.fits')): shutil.copy(os.path.join(self.raw_data_dir, ESO_calibration_file), os.path.join(ESO_calibration_dir_temp, ESO_calibration_type[ifile] + '.fits')) for calibfiles in range(len(calibration_files)): calibfilename = calibration_files[calibfiles].split("/")[-1][:-8] if calibfilename in xmlraw_superstring: temp_date = fits.open(os.path.join(self.raw_data_dir, calibration_files[calibfiles]))[0].header['MJD-OBS'] if abs(temp_date - DATE) <= 1.: f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles]) + ' ' +\ calibration_type[calibfiles] + '\n') if calibration_type[calibfiles] == 'STD'\ and abs(temp_date - DATE) > 1.: f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\ + ' ' + calibration_type[calibfiles] + '\n') if calibration_type[calibfiles] == 'ILLUM' and abs(temp_date\ - DATE) > 1.: f_science.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles]) + ' '\ + calibration_type[calibfiles] + '\n') print('WARNING: STD and SCI obs more than 24 hours apart') if (abs(temp_date - dark_date) <= 1.).all(): f_dark.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\ + ' ' + calibration_type[calibfiles] + '\n') if (abs(temp_date - twilight_date) <= 1.).all(): f_twilight.write(os.path.join(self.raw_data_dir, calibration_files[calibfiles])\ + ' ' + calibration_type[calibfiles] + '\n') f_science.close() f_dark.close() f_twilight.close()
[docs] def _bias(self, exp_list_SCI, exp_list_DAR, exp_list_TWI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_bias`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_DAR : :obj:`list`: The list of all associated dark files including their category read from the fits header exp_list_TWI : :obj:`list`: The list of all associated twilight files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the MASTER BIAS') esorex_cmd = '--log-file=bias.log --log-level=debug'\ + ' muse_bias'\ + ' --nifu=-1'\ + ' --merge' sof = 'bias.sof' if create_sof: sci_bias_sof = os.path.join(self.calibration_dir, 'SCIENCE/bias.sof') dar_bias_sof = os.path.join(self.calibration_dir, 'DARK/bias.sof') twi_bias_sof = os.path.join(self.calibration_dir, 'TWILIGHT/bias.sof') sci_bias_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE/bias_temp.sof') dar_bias_sof_temp = os.path.join(self.calibration_dir, 'DARK/bias_temp.sof') twi_bias_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT/bias_temp.sof') if os.path.exists(sci_bias_sof): os.remove(sci_bias_sof) if self.dark and os.path.exists(dar_bias_sof): os.remove( dar_bias_sof) if os.path.exists(twi_bias_sof): os.remove(twi_bias_sof) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1) + '/'\ + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') if self.dark: raw_data_list_DARK = ascii.read(exp_list_DAR[exposure_ID],\ format='no_header') raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\ format='no_header') f_science = open(sci_bias_sof_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'BIAS': f_science.write(raw_data_list[i][0] + ' '\ + raw_data_list[i][1] + '\n') f_science.close() if self.dark: f_dark = open(dar_bias_sof_temp, 'w') for i in range(len(raw_data_list_DARK)): if raw_data_list_DARK[i][1] == 'BIAS': f_dark.write(raw_data_list_DARK[i][0] + ' '\ + raw_data_list_DARK[i][1] + '\n') f_dark.close() f_twilight = open(twi_bias_sof_temp, 'w') for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'BIAS': f_twilight.write(raw_data_list_TWILIGHT[i][0] + ' '\ + raw_data_list_TWILIGHT[i][1] + '\n') f_twilight.close() if os.path.isfile(sci_bias_sof): assert filecmp.cmp(sci_bias_sof,sci_bias_sof_temp),\ 'CAUTION CALIBRATION FILES ARE DIFFERENT: PLEASE CHECK' os.remove(sci_bias_sof_temp) else: os.rename(sci_bias_sof_temp, sci_bias_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if os.path.isfile(twi_bias_sof): assert filecmp.cmp(twi_bias_sof, twi_bias_sof_temp),\ 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK' os.remove(twi_bias_sof_temp) else: os.rename(twi_bias_sof_temp, twi_bias_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if self.dark: if os.path.isfile(dar_bias_sof): assert filecmp.cmp(dar_bias_sof_temp),\ 'CAUTION DARK FILES ARE DIFFERENT: PLEASE CHECK' os.remove(dar_bias_sof_temp) else: os.rename(dar_bias_sof_temp, dar_bias_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if self.dark: if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _dark(self, exp_list_SCI, exp_list_DAR, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_dark`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_DAR : :obj:`list`: The list of all associated dark files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the MASTER DARK') esorex_cmd = '--log-file=dark.log --log-level=debug'\ + ' muse_dark'\ + ' --nifu=-1'\ + ' --merge' sof = ' dark.sof' if create_sof: dar_dark_sof = os.path.join(self.calibration_dir, 'DARK/dark.sof') dar_dark_sof_temp = os.path.join(self.calibration_dir, 'DARK/dark_temp.sof') if os.path.exists(dar_dark_sof): os.remove(dar_dark_sof) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1) + '/'\ + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_DAR[exposure_ID],\ format='no_header') f = open(dar_dark_sof_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'DARK': f.write(self.raw_data_list[i][0] + ' '\ + raw_data_list[i][1] + '\n') f.write(self.calibration_dir + 'DARK/MASTER_BIAS.fits \ MASTER_BIAS\n') f.close() if os.path.isfile(dar_dark_sof): assert filecmp.cmp(dar_dark_sof, dar_dark_sof_temp),\ 'CAUTION DARK FILES ARE DIFFERENT: PLEASE CHECK' os.remove(dar_dark_sof_temp) else: os.rename(dar_dark_sof_temp, dar_dark_sof) if not self.debug: _call_esorex(os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(os.path.join(self.calibration_dir, 'DARK'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _flat(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_flat`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_TWI : :obj:`list`: The list of all associated twilight files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the MASTER FLAT') esorex_cmd = '--log-file=flat.log --log-level=debug'\ + ' muse_flat'\ + ' --samples=true'\ + ' --nifu=-1'\ + ' --merge' sof = ' flat.sof' if create_sof: sci_flat_sof = os.path.join(self.calibration_dir, 'SCIENCE','flat.sof') sci_flat_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE','flat_temp.sof') twi_flat_sof = os.path.join(self.calibration_dir, 'TWILIGHT','flat.sof') twi_flat_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT','flat_temp.sof') if os.path.exists(sci_flat_sof): os.remove(sci_flat_sof) if os.path.exists(twi_flat_sof): os.remove(twi_flat_sof) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1)\ + '/' + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\ format='no_header') f = open(sci_flat_sof_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'FLAT': f.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') f.write(os.path.join(self.calibration_dir,'SCIENCE','MASTER_BIAS.fits') + ' MASTER_BIAS\n') if self.dark: f.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n') f.close() if os.path.isfile(sci_flat_sof): assert filecmp.cmp(sci_flat_sof,sci_flat_sof_temp),\ 'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK' os.remove(sci_flat_sof_temp) else: os.rename(sci_flat_sof_temp,sci_flat_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) f = open(twi_flat_sof_temp, 'w') for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'FLAT': f.write(raw_data_list_TWILIGHT[i][0]\ + ' ' + raw_data_list_TWILIGHT[i][1] + '\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT','MASTER_BIAS.fits') + ' MASTER_BIAS\n') if self.dark: f.write(os.path.join(self.calibration_dir,'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n') f.close() if os.path.isfile(twi_flat_sof): assert filecmp.cmp(twi_flat_sof,twi_flat_sof_temp),\ 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK' os.remove(twi_flat_sof_temp) else: os.rename(twi_flat_sof_temp,twi_flat_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir,'TWILIGHT'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _wavecal(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_wavecal`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_TWI : :obj:`list`: The list of all associated twilight files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the WAVELENGTH CALIBRATION') esorex_cmd = '--log-file=wavecal.log --log-level=debug'\ + ' muse_wavecal'\ + ' --nifu=-1'\ + ' --residuals'\ + ' --merge' sof = ' wavecal.sof' if create_sof: sci_wav_sof = os.path.join(self.calibration_dir, 'SCIENCE', 'wavecal.sof') sci_wav_sof_temp = os.path.join(self.calibration_dir, 'SCIENCE', 'wavecal_temp.sof') twi_wav_sof = os.path.join(self.calibration_dir, 'TWILIGHT', 'wavecal.sof') twi_wav_sof_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'wavecal_temp.sof') if os.path.exists(sci_wav_sof): os.remove(sci_wav_sof) if os.path.exists(twi_wav_sof): os.remove(twi_wav_sof) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1)\ + '/' + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\ format='no_header') f = open(sci_wav_sof_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'ARC': f.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') if self.dark: f.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n') f.close() if os.path.isfile(sci_wav_sof_temp): assert filecmp.cmp(os.path.join(self.calibration_dir, 'SCIENCE','wavecal.sof'), os.path.join(self.calibration_dir, 'SCIENCE','wavecal_temp.sof'), 'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK') os.remove(sci_wav_sof_temp) else: os.rename(sci_wav_sof_temp, sci_wav_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) f = open(twi_wav_sof_temp, 'w') for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'ARC': f.write(raw_data_list_TWILIGHT[i][0]\ + ' ' + raw_data_list_TWILIGHT[i][1] + '\n') f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits')+ ' MASTER_BIAS\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits')+ ' TRACE_TABLE\n') if self.dark: f.write(os.path.join(self.calibration_dir, 'DARK', 'MASTER_DARK.fits')+ ' MASTER_DARK\n') f.close() if os.path.isfile(twi_wav_sof): assert filecmp.cmp(twi_wav_sof, twi_wav_sof_temp),\ 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK' os.remove(twi_wav_sof_temp) else: os.rename(twi_wav_sof_temp,twi_wav_sof) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _lsf(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_lsf`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_TWI : :obj:`list`: The list of all associated twilight files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the LINE SPREAD FUNCTION') esorex_cmd = '--log-file=lsf.log --log-level=debug'\ + ' muse_lsf'\ + ' --nifu=-1'\ + ' --merge'\ + ' --save_subtracted' sof = ' lsf.sof' if create_sof: sci_sof_file = os.path.join(self.calibration_dir, 'SCIENCE', 'lsf.sof') twi_sof_file = os.path.join(self.calibration_dir, 'TWILIGHT', 'lsf.sof') sci_sof_file_temp = os.path.join(self.calibration_dir, 'SCIENCE', 'lsf_temp.sof') twi_sof_file_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'lsf_temp.sof') if os.path.exists(sci_sof_file): os.remove(sci_sof_file) if os.path.exists(twi_sof_file): os.remove(twi_sof_file) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1)\ + '/' + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\ format='no_header') f = open(sci_sof_file_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'ARC': f.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n') f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n') f.close() if os.path.isfile(sci_sof_file): assert filecmp.cmp(sci_sof_file, sci_sof_file_temp), 'CAUTION SCIENCE FILES ARE DIFFERENT: PLEASE CHECK' os.remove(sci_sof_file_temp) else: os.rename(sci_sof_file_temp, sci_sof_file) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) f = open(twi_sof_file_temp, 'w') for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'ARC': f.write(raw_data_list_TWILIGHT[i][0]\ + ' ' + raw_data_list_TWILIGHT[i][1] + '\n') f.write(os.path.join(self.static_calibration_dir, 'line_catalog.fits') + ' LINE_CATALOG\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n') f.close() if os.path.isfile(twi_sof_file): assert filecmp.cmp(twi_sof_file, twi_sof_file_temp),\ 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK' os.remove(twi_sof_file_temp) else: os.rename(twi_sof_file_temp, twi_sof_file) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'SCIENCE'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs) _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _twilight(self, exp_list_SCI, exp_list_TWI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_twilight`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header exp_list_TWI : :obj:`list`: The list of all associated twilight files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Creating the TWILIGHT FLAT') esorex_cmd = '--log-file=twilight.log --log-level=debug'\ + ' muse_twilight' sof = ' twilight.sof' if create_sof: sof_file = os.path.join(self.calibration_dir, 'TWILIGHT', 'twilight.sof') sof_file_temp = os.path.join(self.calibration_dir, 'TWILIGHT', 'twilight_temp.sof') if os.path.exists(sof_file): os.remove(sof_file) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: ' + str(exposure_ID + 1)\ + '/' + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') raw_data_list_TWILIGHT = ascii.read(exp_list_TWI[exposure_ID],\ format='no_header') MJDsillum = np.array([]) MJDsskyflat = np.array([]) illum_index = np.array([]) for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'SKYFLAT': skyflathdu = fits.open(raw_data_list_TWILIGHT[i][0]) MJDskyflat = skyflathdu[0].header['MJD-OBS'] MJDsskyflat = np.append(MJDsskyflat, MJDskyflat) if raw_data_list_TWILIGHT[i][1] == 'ILLUM': illumhdu = fits.open(raw_data_list_TWILIGHT[i][0]) MJDillum = illumhdu[0].header['MJD-OBS'] MJDsillum = np.append(MJDsillum, MJDillum) illum_index = np.append(illum_index, i) choosen_illum = int(illum_index[np.argmin(np.abs(MJDsillum\ - np.min(MJDskyflat)))]) f = open(sof_file_temp, 'w') for i in range(len(raw_data_list_TWILIGHT)): if raw_data_list_TWILIGHT[i][1] == 'SKYFLAT': f.write(raw_data_list_TWILIGHT[i][0]\ + ' ' + raw_data_list_TWILIGHT[i][1] + '\n') f.write(raw_data_list_TWILIGHT[choosen_illum][0]\ + ' ' + raw_data_list_TWILIGHT[choosen_illum][1] + '\n') if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO': f.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') +' GEOMETRY_TABLE\n') if self.mode == 'NFM-AO': f.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') +' GEOMETRY_TABLE\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_BIAS.fits') +' MASTER_BIAS\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'MASTER_FLAT.fits') +' MASTER_FLAT\n') if self.dark: f.write(os.path.join(self.exposure_dir, 'DARK', 'MASTER_DARK.fits') +' MASTER_DARK\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TRACE_TABLE.fits') +' TRACE_TABLE\n') f.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'WAVECAL_TABLE.fits') +' WAVECAL_TABLE\n') if MJDsskyflat.all() < 57823.5: f.write(os.path.join(self.static_calibration_dir, 'vignetting_mask.fits') +' VIGNETTING_MASK\n') f.close() if os.path.isfile(sof_file): assert filecmp.cmp(sof_file, sof_file_temp), 'CAUTION TWILIGHT FILES ARE DIFFERENT: PLEASE CHECK' os.remove(sof_file_temp) else: os.rename(sof_file_temp, sof_file) if not self.debug: _call_esorex(self, os.path.join(self.calibration_dir, 'TWILIGHT'),\ esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if not create_sof: if not self.debug: _call_esorex(self, self.calibration_dir + 'TWILIGHT', esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _science_pre(self, exp_list_SCI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_scibasic`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... Science PREPROCESSING') esorex_cmd = '--log-file=sci_basic_object.log --log-level=debug'\ + ' muse_scibasic'\ + ' --nifu=-1'\ + ' --resample'\ + ' --saveimage=true'\ + ' --skyreject=' + self.skyreject\ + ' --skylines=' + self.skylines \ + ' --merge' sof = ' sci_basic_object.sof' esorex_cmd_std = '--log-file=sci_basic_std.log --log-level=debug'\ + ' muse_scibasic'\ + ' --nifu=-1'\ + ' --resample'\ + ' --saveimage=true'\ + ' --skyreject=15.,15.,1'\ + ' --merge' sof_std = ' sci_basic_std.sof' sci_basic_std_sof = os.path.join(self.working_dir, 'std', 'sci_basic_std.sof') sci_basic_std_sof_temp = os.path.join(self.working_dir, 'std', 'sci_basic_std_temp.sof') if os.path.exists(sci_basic_std_sof): os.remove(sci_basic_std_sof) for exposure_ID in range(len(exp_list_SCI)): print(' >>> processing exposure: '\ + str(exposure_ID + 1) + '/' + str(len(exp_list_SCI))) print(' >>> processing: ' + exp_list_SCI[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI[exposure_ID],\ format='no_header') exposure_dir = exp_list_SCI[exposure_ID][:-9] MJDsillum = np.array([]) illum_index = np.array([]) for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'OBJECT' or raw_data_list[i][1] == 'SKY': objecthdu = fits.open(raw_data_list[i][0]) MJDobject = objecthdu[0].header['MJD-OBS'] if raw_data_list[i][1] == 'STD': stdhdu = fits.open(raw_data_list[i][0]) MJDstd = stdhdu[0].header['MJD-OBS'] if raw_data_list[i][1] == 'ILLUM': illumhdu = fits.open(raw_data_list[i][0]) MJDillum = illumhdu[0].header['MJD-OBS'] MJDsillum = np.append(MJDsillum, MJDillum) illum_index = np.append(illum_index, i) if len(MJDsillum) == 0: print("WARNING: No ILLUM exposure was given !!!") choosen_illum_object = None else: choosen_illum_object = int(illum_index[np.argmin(np.abs(MJDsillum\ - MJDobject))]) choosen_illum_std = int(illum_index[np.argmin(np.abs(MJDsillum\ - MJDstd))]) f_std = open(sci_basic_std_sof_temp, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'STD': f_std.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') if choosen_illum_object: f_std.write(raw_data_list[choosen_illum_std][0]\ + ' ' + raw_data_list[choosen_illum_std][1] + '\n') if not self.using_ESO_calibration: f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'MASTER_FLAT.fits') + ' MASTER_FLAT\n') f_std.write(os.path.join(self.calibration_dir, 'TWILIGHT', 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n') f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') f_std.write(os.path.join(self.calibration_dir, 'SCIENCE', 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f_std.write(os.path.join( self.calibration_dir, 'DARK', 'MASTER_DARK.fits') + ' MASTER_DARK\n') if self.using_ESO_calibration: f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_FLAT.fits') + ' MASTER_FLAT\n') f_std.write(os.path.join(self.ESO_calibration_dir, 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n') f_std.write(os.path.join(self.ESO_calibration_dir, 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') f_std.write(os.path.join(self.ESO_calibration_dir, 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f_std.write(os.path.join(self.ESO_calibration_dir, 'MASTER_DARK.fits') + ' MASTER_DARK\n') if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO': f_std.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n') if self.mode == 'NFM-AO': f_std.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n') f_std.write(os.path.join(self.static_calibration_dir, 'badpix_table.fits') + ' BADPIX_TABLE\n') f_std.close() if create_sof: sci_basic_obj_sof = os.path.join(exposure_dir, 'sci_basic_object.sof') sci_basic_obj_sof_temp = os.path.join(exposure_dir, 'sci_basic_object_temp.sof') if os.path.exists(sci_basic_obj_sof): os.remove(sci_basic_obj_sof) f_object = open(sci_basic_obj_sof, 'w') for i in range(len(raw_data_list)): if raw_data_list[i][1] == 'OBJECT': f_object.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') if raw_data_list[i][1] == 'SKY': f_object.write(raw_data_list[i][0]\ + ' ' + raw_data_list[i][1] + '\n') if choosen_illum_object: f_object.write(raw_data_list[choosen_illum_object][0]\ + ' ' + raw_data_list[choosen_illum_object][1] + '\n') if not self.using_ESO_calibration: f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','MASTER_BIAS.fits') + ' MASTER_BIAS\n') f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','MASTER_FLAT.fits') + ' MASTER_FLAT\n') f_object.write(os.path.join(self.calibration_dir, 'TWILIGHT','TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n') f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','TRACE_TABLE.fits') + ' TRACE_TABLE\n') f_object.write(os.path.join(self.calibration_dir, 'SCIENCE','WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f_object.write(os.path.join(self.calibration_dir, 'DARK','MASTER_DARK.fits') + ' MASTER_DARK\n') if self.using_ESO_calibration: f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_BIAS.fits') + ' MASTER_BIAS\n') f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_FLAT.fits') + ' MASTER_FLAT\n') f_object.write(os.path.join(self.ESO_calibration_dir, 'TWILIGHT_CUBE.fits') + ' TWILIGHT_CUBE\n') f_object.write(os.path.join(self.ESO_calibration_dir, 'TRACE_TABLE.fits') + ' TRACE_TABLE\n') f_object.write(os.path.join(self.ESO_calibration_dir, 'WAVECAL_TABLE.fits') + ' WAVECAL_TABLE\n') if self.dark: f_object.write(os.path.join(self.ESO_calibration_dir, 'MASTER_DARK.fits') + ' MASTER_DARK\n') if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO': f_object.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n') if self.mode == 'NFM-AO': f_object.write(os.path.join(self.static_calibration_dir, 'geometry_table_wfm.fits') + ' GEOMETRY_TABLE\n') f_object.write(os.path.join(self.static_calibration_dir, 'badpix_table.fits') + ' BADPIX_TABLE\n') f_object.close() if not self.debug: if self.reduce_sci: _call_esorex(self, exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs) if os.path.isfile(sci_basic_std_sof): assert filecmp.cmp(sci_basic_std_sof, sci_basic_std_sof_temp),\ 'CAUTION DIFFERENT STD STARS FOR VARIOUS FIELDS: PLEASE CHECK' os.remove(sci_basic_std_sof_temp) else: os.rename(sci_basic_std_sof_temp, sci_basic_std_sof) if not self.debug: if self.reduce_std: _call_esorex(self, os.path.join(self.working_dir, 'std'), esorex_cmd_std, sof_std, esorex_kwargs=esorex_kwargs)
[docs] def _std_flux(self, exp_list_SCI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_standard`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... FLUX CALIBRATION') esorex_cmd = ' --log-file=std_flux.log --log-level=debug'\ + ' muse_standard'\ + ' --filter=white' sof = ' std_flux.sof' PIXTABLE_STD_list = _get_filelist(self, os.path.join(self.working_dir,'std'),\ 'PIXTABLE_STD*.fits') if create_sof: if os.path.exists(os.path.join(self.working_dir,'std','std_flux.sof')): os.remove(os.path.join(self.working_dir,'std','std_flux.sof')) f = open(os.path.join(self.working_dir,'std','std_flux.sof'), 'w') for i in range(len(PIXTABLE_STD_list)): f.write(os.path.join(self.working_dir,'std',PIXTABLE_STD_list[i]\ + ' PIXTABLE_STD') + '\n') f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'std_flux_table.fits') + ' STD_FLUX_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n') f.close() if not self.debug: _call_esorex(self, os.path.join(self.working_dir,'std'), esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _sky(self, exp_list_SCI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_sky`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... SKY CREATION') sky = np.zeros_like(exp_list_SCI, dtype=bool) sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SKY': sky[idx] = True if exposure[-8:-5] == 'SCI': sci[idx] = True if self.skyfield == 'auto' and (sky == True).any(): exp_list_SCI_sky = np.array(exp_list_SCI)[sky] else: exp_list_SCI_sky = np.array(exp_list_SCI) for exposure_ID in range(len(exp_list_SCI_sky)): print(' >>> processing exposure: '\ + str(exposure_ID + 1) + '/' + str(len(exp_list_SCI_sky))) print(' >>> processing: ' + exp_list_SCI_sky[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI_sky[exposure_ID],\ format='no_header') exposure_dir = exp_list_SCI_sky[exposure_ID][:-9] + '/' if self.skyfield == 'auto' and (sky == True).any(): PIXTABLE_SKY_list =\ _get_filelist(self, exposure_dir, 'PIXTABLE_SKY*.fits') else: PIXTABLE_SKY_list =\ _get_filelist(self, exposure_dir, 'PIXTABLE_OBJECT*.fits') if create_sof: sky_sof = os.path.join(exposure_dir, 'sky.sof') if os.path.exists(sky_sof): os.remove(sky_sof) f = open(sky_sof, 'w') for i in range(len(PIXTABLE_SKY_list)): f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n') if not self.using_ESO_calibration: f.write(os.path.join(self.calibration_dir,'SCIENCE','LSF_PROFILE.fits') + ' LSF_PROFILE\n') if self.using_ESO_calibration: f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n') f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n') f.close() esorex_cmd = '--log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore) sof = ' sky.sof' if self.skyfield == 'auto' and (sky == True).any(): if not self.debug: _call_esorex(self, exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs) else: if not self.debug: _call_esorex(self, exposure_dir,\ '--log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore),\ sof, esorex_kwargs=esorex_kwargs) if self.skyfield == 'auto' and (sky == True).any(): skydate = np.ones_like(exp_list_SCI_sky, dtype=float) for idx, exps in enumerate(exp_list_SCI_sky): skydate[idx] = fits.open(os.path.join(exps[:-9], 'PIXTABLE_SKY_0001-01.fits'))[0].header['MJD-OBS'] for idx, exps in enumerate(np.array(exp_list_SCI)[sci]): scidate = fits.open(os.path.join(exps[:-9], 'PIXTABLE_OBJECT_0001-01.fits'))[0].header['MJD-OBS'] ind = np.argmin(abs(skydate - scidate)) flist = glob.glob(os.path.join(exp_list_SCI_sky[ind][:-9], 'SKY_*.fits')) for f in flist: shutil.copy(f, os.path.join(exps[:-9],'.'))
[docs] def _modified_sky(self, exp_list_SCI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_sky`` with the modified continuum and line subtraction as described in `Zeidler et al. 2019`_. Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... SKY CREATION MODIFIED') sof = 'sky.sof' sky = np.zeros_like(exp_list_SCI, dtype=bool) sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SKY': sky[idx] = True if exposure[-8:-5] == 'SCI': sci[idx] = True if self.skyfield == 'auto' and (sky == True).any(): exp_list_SCI_sky = np.array(exp_list_SCI)[sky] else: exp_list_SCI_sky = np.array(exp_list_SCI) for exposure_ID in range(len(exp_list_SCI_sky)): print(' >>> processing exposure: ' + str(exposure_ID + 1)\ + '/' + str(len(exp_list_SCI_sky))) print(' >>> processing: ' + exp_list_SCI_sky[exposure_ID]) print(' ') raw_data_list = ascii.read(exp_list_SCI_sky[exposure_ID],\ format='no_header') exposure_dir = exp_list_SCI_sky[exposure_ID][:-9] if self.skyfield == 'auto' and (sky == True).any(): PIXTABLE_SKY_list = _get_filelist(self, exposure_dir, 'PIXTABLE_SKY*.fits') else: PIXTABLE_SKY_list = _get_filelist(self, exposure_dir, 'PIXTABLE_OBJECT*.fits') if create_sof: sky_sof = os.path.join(exposure_dir, 'sky.sof') if os.path.exists(sky_sof): os.remove(sky_sof) f = open(sky_sof, 'w') for i in range(len(PIXTABLE_SKY_list)): f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n') if not self.using_ESO_calibration: f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') if self.using_ESO_calibration: f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n') f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n') f.close() if self.skyfield == 'auto' and (sky == True).any(): if not self.debug: _call_esorex(self, exposure_dir,\ '--log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore),\ sof , esorex_kwargs=esorex_kwargs) else: if not self.debug: _call_esorex(self, exposure_dir,\ '--log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore),\ sof, esorex_kwargs=esorex_kwargs) os.chdir(exposure_dir) print('create temporary backup of the SKY_CONTINUUM.fits ==> SKY_CONTINUUM_temp.fits') shutil.copy('SKY_CONTINUUM.fits', 'SKY_CONTINUUM_temp.fits') sky_cont_hdu = fits.open('SKY_CONTINUUM.fits', checksum=True) sky_cont = sky_cont_hdu[1].data for i in range(len(sky_cont)): sky_cont[i][1] = 0. sky_cont_hdu[1].data = sky_cont sky_cont_hdu.writeto('SKY_CONTINUUM_zero.fits',\ overwrite=True, checksum=True) os.chdir(self.rootpath) if create_sof: sky_sof = os.path.join(exposure_dir, 'sky.sof') if os.path.exists(sky_sof): os.remove(sky_sof) f = open(sky_sof, 'w') for i in range(len(PIXTABLE_SKY_list)): f.write(os.path.join(exposure_dir, PIXTABLE_SKY_list[i]) + ' PIXTABLE_SKY\n') if not self.using_ESO_calibration: f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') if self.using_ESO_calibration: f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n') if self.modified_continuum: f.write(os.path.join(exposure_dir, 'SKY_CONTINUUM_zero.fits') + ' SKY_CONTINUUM\n') else: f.write(os.path.join(exposure_dir, 'SKY_CONTINUUM.fits') + ' SKY_CONTINUUM\n') f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'sky_lines.fits') + ' SKY_LINES\n') f.close() if self.skyfield == 'auto' and (sky == True).any(): if not self.debug: _call_esorex(self, exposure_dir,\ '--log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore),\ sof, esorex_kwargs=esorex_kwargs) else: if not self.debug: _call_esorex(self, exposure_dir,\ ' --log-file=sky.log --log-level=debug'\ + ' muse_create_sky'\ + ' --fraction=' + str(self.skyfraction)\ + ' --ignore=' + str(self.skyignore),\ sof, esorex_kwargs=esorex_kwargs) os.chdir(exposure_dir) if self.modified_continuum: print('SKY_CONTINUUM_temp.fits ==> SKY_CONTINUUM.fits') shutil.copy('SKY_CONTINUUM_temp.fits', 'SKY_CONTINUUM.fits') os.remove('SKY_CONTINUUM_temp.fits') hdu = fits.open('SKY_LINES.fits', checksum=True) data = hdu[1].data lines_to_keep = [i for i, sky_line in enumerate(data) \ if (sky_line[0][:2] == 'O2' or sky_line[0][:2] == 'OH')] data = data[lines_to_keep] hdu[1].data = data hdu.writeto('SKY_LINES.fits', overwrite=True, checksum=True) os.chdir(self.rootpath) if self.skyfield == 'auto' and (sky == True).any(): skydate = np.ones_like(exp_list_SCI_sky, dtype=float) for idx, exps in enumerate(exp_list_SCI_sky): skydate[idx] = fits.open(os.path.join(exps[:-9], 'PIXTABLE_SKY_0001-01.fits'))[0].header['MJD-OBS'] for idx, exps in enumerate(np.array(exp_list_SCI)[sci]): scidate = fits.open(os.path.join(exps[:-9], 'PIXTABLE_OBJECT_0001-01.fits'))[0].header['MJD-OBS'] ind = np.argmin(abs(skydate - scidate)) flist = glob.glob(os.path.join(exp_list_SCI_sky[ind][:-9], 'SKY_*.fits')) for f in flist: shutil.copy(f, os.path.join(exps[:-9], '.'))
[docs] def _scipost(self, exp_list_SCI, create_sof, OB, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_scipost`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user OB : :obj:`str`: The specific ``OB`` to be reduced. Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... SCIENCE POST-PROCESSING') unique_pointings = np.array([]) unique_tester = ' ' sof = 'scipost.sof' sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SCI': sci[idx] = True exp_list_SCI = np.array(exp_list_SCI)[sci] for expnum in range(len(exp_list_SCI)): if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1: unique_pointings = np.append(unique_pointings,\ exp_list_SCI[expnum][:-16]) unique_tester = unique_tester + exp_list_SCI[expnum][:-16] for unique_pointing_num in range(len(unique_pointings)): print(' ') print(' >>> processing pointing: ' + str(unique_pointing_num + 1) + '/' + str(len(unique_pointings))) print(' ') unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] exp_list = glob.glob(sec + '*SCI.list') for exp_num in range(len(exp_list)): print(' ') print(' >>> processing exposure: '\ + str(exp_num + 1) + '/' + str(len(exp_list))) print(' ') PIXTABLE_OBJECT_list = _get_filelist(self, exp_list[exp_num][:-9], 'PIXTABLE_OBJECT*.fits') if create_sof: scipost_sof = os.path.join(exp_list[exp_num][:-9], 'scipost.sof') if os.path.exists(scipost_sof): os.remove(scipost_sof) f = open(scipost_sof, 'w') for i in range(len(PIXTABLE_OBJECT_list)): f.write(os.path.join(exp_list[exp_num][:-9], PIXTABLE_OBJECT_list[i]) + ' PIXTABLE_OBJECT\n') if not self.using_ESO_calibration: f.write(os.path.join(self.calibration_dir, 'SCIENCE', 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') f.write(os.path.join(self.calibration_dir, 'ASTROMETRY_WCS_0001.fits') + ' ASTROMETRY_WCS\n') if self.using_ESO_calibration: f.write(os.path.join(self.ESO_calibration_dir, 'LSF_PROFILE.fits') + ' LSF_PROFILE\n') f.write(os.path.join(self.ESO_calibration_dir, 'ASTROMETRY_WCS.fits') + ' ASTROMETRY_WCS\n') f.write(os.path.join(self.working_dir, 'std', 'STD_RESPONSE_0001.fits') + ' STD_RESPONSE\n') f.write(os.path.join(self.working_dir, 'std', 'STD_TELLURIC_0001.fits') + ' STD_TELLURIC\n') if self.skysub: f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_LINES.fits') + ' SKY_LINES\n') if self.modified_continuum: f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_CONTINUUM_zero.fits') + ' SKY_CONTINUUM\n') else: f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_CONTINUUM.fits') + ' SKY_CONTINUUM\n') # if self.mode == 'WFM-AO' or self.mode == 'WFM-NOAO': # f.write(self.static_calibration_dir\ # + 'astrometry_wcs_wfm.fits ASTROMETRY_WCS\n') # if self.mode == 'NFM-AO': # f.write(self.static_calibration_dir\ # + 'astrometry_wcs_nfm.fits ASTROMETRY_WCS\n') if not self.autocalib == 'none': f.write(os.path.join(exp_list[exp_num][:-9], 'SKY_MASK.fits') + ' SKY_MASK\n') if self.autocalib == 'user': f.write(os.path.join(exp_list[exp_num][:-9], 'AUTOCAL_FACTORS.fits') + ' AUTOCAL_FACTORS\n') f.write(os.path.join(self.static_calibration_dir, 'extinct_table.fits') + ' EXTINCT_TABLE\n') f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n') if self.raman: f.write(os.path.join(self.static_calibration_dir, 'raman_lines.fits') + ' RAMAN_LINES\n') f.close() if self.skysub: print('with sky subtraction...') if not self.skysub: print('without sky subtraction...') if not self.debug: _call_esorex(self, exp_list[exp_num][:-9],\ '--log-file=scipost.log --log-level=debug'\ + ' muse_scipost'\ + ' --save=cube,skymodel,individual,raman,autocal'\ + ' --skymethod=' + self.skymethod\ + ' --autocalib=' + str(self.autocalib).lower()\ + ' --filter=white',\ sof, esorex_kwargs=esorex_kwargs)
[docs] def _dither_collect(self, exp_list_SCI, OB): ''' This module collects the individual dither exposures for one OB to be combined in the steps: :mod:`MUSEreduce._exp_align` and :mod:`MUSEreduce._exp_combine` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user OB : :obj:`str`: The specific ``OB`` to be reduced. Kwargs: ''' print('... COLLECT DITHER POSTITIONS') unique_pointings = np.array([]) unique_tester = ' ' sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SCI': sci[idx] = True exp_list_SCI = np.array(exp_list_SCI)[sci] print(' ') print(' >>> Copying files:') print(' ') if len(self.user_list) == 0: for expnum in range(len(exp_list_SCI)): if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1: unique_pointings = np.append(unique_pointings,\ exp_list_SCI[expnum][:-16]) unique_tester = unique_tester + exp_list_SCI[expnum][:-16] if len(self.user_list) == 1: if self.user_list[0] == "all": unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])] if len(self.user_list) > 1: unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])] for unique_pointing_num in range(len(unique_pointings)): if self.dithering_multiple_OBs: if self.multOB_exp_counter == 0: self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] unique_pointings_ID = self.multOB_unique_pointings_ID else: unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] if len(self.user_list) == 0: exp_list = glob.glob(os.path.join(sec, '*SCI.list')) if len(self.user_list) == 1: if self.user_list[0] == 'all': exp_list = exp_list_SCI if len(self.user_list) > 1: exp_list = [] for user_list_element in self.user_list: exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list')) if self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID) if not self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(sec) if not os.path.exists(combining_exposure_dir): os.makedirs(combining_exposure_dir) files = glob.glob(os.path.join(combining_exposure_dir, '*FOV_0001*')) if len(files) > 0: for f in files: os.remove(f) for unique_pointing_num in range(len(unique_pointings)): if self.dithering_multiple_OBs: unique_pointings_ID = self.multOB_unique_pointings_ID else: unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] if len(self.user_list) == 0: exp_list = glob.glob(sec + '*SCI.list') if len(self.user_list) == 1: if self.user_list[0] == 'all': exp_list = exp_list_SCI if len(self.user_list) > 1: exp_list = [] for user_list_element in self.user_list: exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list')) if self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID) if not self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(sec) ident_pos = np.zeros(len(exp_list)) for idx in range(len(exp_list)): ident = 0 for idx2 in range(len(exp_list)): if exp_list[idx][-20:-12] == exp_list[idx2][-20:-12]: ident_pos[idx2] = ident ident += 1 for exp_num in range(len(exp_list)): origin_files = [os.path.join(exp_list[exp_num][:-9], 'DATACUBE_FINAL.fits'), os.path.join(exp_list[exp_num][:-9], 'IMAGE_FOV_0001.fits'), os.path.join(exp_list[exp_num][:-9], 'PIXTABLE_REDUCED_0001.fits') ] destination_files = [os.path.join(combining_exposure_dir, 'DATACUBE_FINAL_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits'), os.path.join(combining_exposure_dir, 'IMAGE_FOV_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits'), os.path.join(combining_exposure_dir, 'PIXTABLE_REDUCED_' + OB + '_' + exp_list[exp_num][-20:-12] + '_' + str(int(ident_pos[exp_num])).rjust(2, '0') + '.fits') ] for (origin_file, destination_file) in zip(origin_files, destination_files): print(origin_file + ' ==> ' + destination_file) shutil.copy(origin_file, destination_file)
[docs] def _exp_align(self, exp_list_SCI, create_sof, OB, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_exp_align`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user OB : :obj:`str`: The specific ``OB`` to be reduced. Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... CUBE ALIGNMENT') esorex_cmd = '--log-file=exp_align.log --log-level=debug'\ + ' muse_exp_align'\ + ' --srcmin='+str(self.config['exp_align']['srcmin'])\ + ' --srcmax='+str(self.config['exp_align']['srcmax']) sof = ' exp_align.sof' unique_pointings = np.array([]) unique_tester = ' ' sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SCI': sci[idx] = True exp_list_SCI = np.array(exp_list_SCI)[sci] if len(self.user_list) == 0: for expnum in range(len(exp_list_SCI)): if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1: unique_pointings = np.append(unique_pointings,\ exp_list_SCI[expnum][:-16]) unique_tester = unique_tester + exp_list_SCI[expnum][:-16] if len(self.user_list) == 1: if self.user_list[0] == "all": unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])] if len(self.user_list) > 1: unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])] for unique_pointing_num in range(len(unique_pointings)): for unique_pointing_num in range(len(unique_pointings)): if self.dithering_multiple_OBs: if self.multOB_exp_counter == 0: self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] unique_pointings_ID = self.multOB_unique_pointings_ID else: unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] if len(self.user_list) == 0: exp_list = glob.glob(os.path.join(sec, '*SCI.list')) if len(self.user_list) == 1: if self.user_list[0] == 'all': exp_list = exp_list_SCI if len(self.user_list) > 1: exp_list = [] for user_list_element in self.user_list: exp_list.append(os.path.join(self.working_dir, user_list_element + '_SCI.list')) if self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID) if not self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(sec) for unique_pointing_num in range(len(unique_pointings)): print(' ') print(' >>> processing pointing: ' + str(unique_pointing_num + 1)\ + '/' + str(len(unique_pointings))) print(' ') for unique_pointing_num in range(len(unique_pointings)): if self.dithering_multiple_OBs: unique_pointings_ID = self.multOB_unique_pointings_ID else: unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] if self.dithering_multiple_OBs: print(unique_pointings_ID) combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID) if not self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(sec) exp_list = _get_filelist(self, combining_exposure_dir, 'IMAGE_FOV_*.fits') if create_sof: sof_file = os.path.join(combining_exposure_dir, 'exp_align.sof') if os.path.exists(sof_file): os.remove(sof_file) f = open(sof_file, 'w') for i in range(len(exp_list)): f.write(os.path.join(combining_exposure_dir, exp_list[i]) + ' IMAGE_FOV\n') f.close() if not self.debug: _call_esorex(self, combining_exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _exp_combine(self, exp_list_SCI, create_sof, esorex_kwargs=None): ''' This module calls *ESORex'* ``muse_exp_combine`` Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header create_sof : :obj:`bool`: :obj:`True`: ``bias.sof`` is created and populated :obj:`False`: ``bias.sof`` is not created and needs to be provided by the user Kwargs: esorex_kwargs: :obj:`str` Additional keywords that should be passed for special processing. These should be passed as one string ''' print('... EXPOSURE COMBINATION') esorex_cmd = '--log-file=exp_combine.log --log-level=debug'\ + ' muse_exp_combine'\ + ' --filter=white'\ + ' --save=cube'\ + ' --crsigma=5.'\ + ' --weight=' + str(self.weight) sof = ' exp_combine.sof' unique_pointings = np.array([]) unique_tester = ' ' sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SCI': sci[idx] = True exp_list_SCI = np.array(exp_list_SCI)[sci] if len(self.user_list) == 0: for expnum in range(len(exp_list_SCI)): if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1: unique_pointings = np.append(unique_pointings,\ exp_list_SCI[expnum][:-16]) unique_tester = unique_tester + exp_list_SCI[expnum][:-16] if len(self.user_list) == 1: if self.user_list[0] == "all": unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])] if len(self.user_list) > 1: unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])] for unique_pointing_num in range(len(unique_pointings)): print(' ') print(' >>> processing pointing: ' + str(unique_pointing_num + 1)\ + '/' + str(len(unique_pointings))) print(' ') for unique_pointing_num in range(len(unique_pointings)): if self.dithering_multiple_OBs: if self.multOB_exp_counter == 0: self.multOB_unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] unique_pointings_ID = self.multOB_unique_pointings_ID else: unique_pointings_ID = unique_pointings[unique_pointing_num][-18:] sec = unique_pointings[unique_pointing_num] if self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(self.combining_OBs_dir, unique_pointings_ID) if not self.dithering_multiple_OBs: combining_exposure_dir = os.path.join(sec) pixtable_list = _get_filelist(self, combining_exposure_dir, 'PIXTABLE_REDUCED_*.fits') if create_sof: sof_file = os.path.join(combining_exposure_dir, 'exp_combine.sof') if os.path.exists(sof_file): os.remove(sof_file) f = open(sof_file, 'w') for i in range(len(pixtable_list)): f.write(os.path.join(combining_exposure_dir, pixtable_list[i]) + ' PIXTABLE_REDUCED\n') f.write(os.path.join(combining_exposure_dir, 'OFFSET_LIST.fits') + ' OFFSET_LIST\n') f.write(os.path.join(self.static_calibration_dir, 'filter_list.fits') + ' FILTER_LIST\n') f.close() if not self.debug: _call_esorex(self, combining_exposure_dir, esorex_cmd, sof, esorex_kwargs=esorex_kwargs)
[docs] def _cleanup(self, exp_list_SCI, include_std=False, include_combined=False, include_calibrations=False): ''' This module calls cleans up intermediate products to save disk space Args: exp_list_SCI : :obj:`list` The list of all associated science files including their category read from the fits header Kwargs: include_std: :obj:`bool` If :obj:`true` the standard star folder is also emptied include_combined: :obj:`bool` If :obj:`true` the combined cube folder is also emptied ''' print('... CLEANING UP THE FOLDERS') unique_pointings = np.array([]) unique_tester = ' ' sci = np.zeros_like(exp_list_SCI, dtype=bool) for idx, exposure in enumerate(exp_list_SCI): if exposure[-8:-5] == 'SCI': sci[idx] = True exp_list_SCI = np.array(exp_list_SCI)[sci] if len(self.user_list) == 0: for expnum in range(len(exp_list_SCI)): if unique_tester.find(exp_list_SCI[expnum][:-16]) == -1: unique_pointings = np.append(unique_pointings,\ exp_list_SCI[expnum][:-16]) unique_tester = unique_tester + exp_list_SCI[expnum][:-16] if len(self.user_list) == 1: if self.user_list[0] == "all": unique_pointings = [os.path.join(self.working_dir, exp_list_SCI[0][:-16])] if len(self.user_list) > 1: unique_pointings = [os.path.join(self.working_dir, self.user_list[0][:18])] if include_combined: for unique_pointing in unique_pointings: print("Removing: ", unique_pointing) shutil.rmtree(unique_pointing) if include_std: std_star_dir = os.path.join(self.working_dir, 'std') print("Removing: ", std_star_dir) if os.path.exists(std_star_dir): shutil.rmtree(std_star_dir) if include_calibrations: eso_calib_dir = os.path.join(self.working_dir, 'ESO_calibrations') calib_dir = os.path.join(self.working_dir, 'calibrations') static_calib_dir = os.path.join(self.working_dir, 'static_calibration_files') print("Removing: ", eso_calib_dir) if os.path.exists(eso_calib_dir): shutil.rmtree(eso_calib_dir) print("Removing: ", calib_dir) if os.path.exists(calib_dir): shutil.rmtree(calib_dir) print("Removing: ", static_calib_dir) if os.path.exists(static_calib_dir): shutil.rmtree(static_calib_dir) for exp in exp_list_SCI: print("Removing: ", exp[:-9]) if os.path.exists(exp[:-9]): shutil.rmtree(exp[:-9])