#!/usr/bin/env python
__version__ = '0.2.2'
__revision__ = '20260210'
import sys
import os
import glob
import shutil
import numpy as np
from astropy.io import ascii
from astropy.io import fits
from astropy.table import Table
from astropy.table import Column
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.wcs import WCS
from astropy.stats import sigma_clip
from spectral_cube import SpectralCube
import montage_wrapper as montage
''' internal modules'''
from MUSEpack.utils import ABtoVega
from MUSEpack.utils import _any_bit_in_number
[docs]
def wcs_cor(input_fits, offset_input, path=None, offset_path=None,
output_file=None, output_path=None, out_frame=None, in_frame=None,
correct_flux=False, spec_folder='stars', spec_path=None,
hst_filter=['ACS', 'F814W'], AB_zpt=None, Vega_zpt=None,
correctiontype='shift', save_output=True, debug=False, spec_snr=5.0,
dqs = [2,4,8]):
'''
Args:
inputfits : :obj:`str`
The fully reduced datacube, whose WCS has to be corrected
offset_input : :obj:`str`
The prm file produced by Pampelmuse
or the OFFSET_LIST.fits file from the ``exp_align`` routine
Kwargs:
path : :obj:`str` (optional, Default: current directory)
I/O path
offset_path : :obj:`str` (optional, default: current directory)
I/O path of prm file
output_file : :obj:`str` (optional, default: input file name +_cor)
outputfile name
output_path : :obj:`str` (optional, default: current directory)
output path. If not provided the I/O folder is identical
output_frame : :obj:`str` (optional, default: input frame)
coordinate frame of the output cube in case one want to
change. Is always set to default input frame if corrections
are not based on a .prm file
in_frame : :obj:`str` (optional, default: input frame)
coordinate frame of the output cube in case it cannot be
determined from the header information or it has to be manually
changed
correct_flux : :obj:`bool` (optional, default: :obj:`False`)
If set :obj:`True` the fluxes of the data cube will be corrected
to match the input catalog to correct for calibration offsets.
If the input file is a prm-file:
This step is only recommended if the input fluxes can be trusted.
CUBEFIT and GETSPECTRA have to executed again using the corrected
data cube to correct the prm file and to extract the corrected
spectra.
If the input file is a ESO OFFSET-LIST.fits file:
The fluxes will be scaled based on the 'FLUX_SCALE' entries.
save_output : :obj:`bool` (optional, default: :obj:`True`)
If set :obj:`False` the corrected cube will not be saved. This can
be used to just obtain the correction factors
spec_folder : :obj:`str` (optional, default: ``stars``)
The folder name, in which the the extracted stellar spectra are
stored. This is only needed if correct_flux=:obj:`True` and the
corrections are based on a .prm file
spec_path : :obj:`str` (optional, default: current directory)
I/O path of the ``spec_folder``. This keyword is only need if
the corrections are based on a .prm file
hst_filter : :obj:`list' (optional, default: ['ACS', 'F814W'])
The HST filter used when convolving the spectra to measure the
fluxes. It is used with STSYNPHOT to obtain the correct zeropoint
conversion.
AB_zpt : :obj:`float` (optional, default: None)
AB zeropoint for any filter that is not part of SYNPHOT. If provided
the Vega_zpt must be provided as well. hst_filter will be overwritten
Vega_zpt : :obj:`float` (optional, default: None)
Vega zeropoint for any filter that is not part of SYNPHOT. If provided
the AB_zpt must be provided as well. hst_filter will be overwritten
correctiontype : :obj:`str` (optional, default: ``shift``)
the type of distortion correction
``full``: the full 2D CD matrix, only possible if based on a
.prm file
``shift``: shift in XY only.
debug : :obj:`bool` (optional, default: :obj:`False`)
If set :obj:`True` tadditional information will be printed
spec_snr : :obj:`float` (optional, default: 5.0)
the S/R limit when a spectra is used to calculate the flux correction.
This limit might need to be lowered for low S/R obsverations
dqs : :obj:`list` (optional, default: [2,4,8])
list of DQ flags where source as dicarted.
'''
if path == None:
path = os.getcwd()
if offset_path == None:
offset_path = os.getcwd()
if spec_path == None:
spec_path = os.getcwd()
cube = fits.open(path + '/' + input_fits + '.fits')
offset = fits.open(offset_path + '/' + offset_input)
print('processing observation: ' + path + '/' + input_fits + '.fits')
if len(offset) == 6:
print('using prm file: ' + offset_path + '/' + offset_input)
offset_type = 'prm'
if len(offset) == 2:
print('using OFFSET file: ' + offset_path + '/' + offset_input)
offset_type = 'eso'
spec_folder = None
spec_path = None,
correctiontype = 'shift'
output_frame = None
in_frame = None
assert len(offset) != 6 or len(offset) != 2,\
'This offset file is currently not supported: Please check'
assert (len(cube) != 3 or len(cube) != 1),\
'fits file has currently unsupported extensions: Please check'
if offset_type == 'prm':
if len(cube) == 3:
prihdr = cube[0].header
sechdr = cube[1].header
assert prihdr['INSTRUME'].strip() == 'MUSE',\
' This is not a MUSE cube. Please check'
if not in_frame:
if 'RADESYS' in prihdr:
in_frame = prihdr['RADESYS'].lower()
elif 'RADECSYS' in prihdr:
in_frame = prihdr['RADECSYS'].lower()
print('MUSE cube detected')
if len(cube) != 3:
print('No MUSE cube header info assumed to be in one extension')
prihdr = cube[0].header
sechdr = cube[0].header
if 'RADESYS' in prihdr:
in_frame = prihdr['RADESYS'].lower()
elif 'RADECSYS' in prihdr:
in_frame = prihdr['RADECSYS'].lower()
if correct_flux:
print('Flux correction currently only supported'\
+ ' for MUSE cubes')
assert in_frame is not None, 'No WCS frame provided'
print(' Input WCS frame: ', in_frame)
if out_frame is None:
print('Output WCS frame: ', in_frame)
else:
print('Output WCS frame: ', out_frame)
print('')
A = np.nanmedian(offset[4].data[0][1])
B = np.nanmedian(offset[4].data[1][1])
C = np.nanmedian(offset[4].data[2][1])
D = np.nanmedian(offset[4].data[3][1])
x0 = np.nanmedian(offset[4].data[4][1])
y0 = np.nanmedian(offset[4].data[5][1])
CD = np.array([[A, C], [B, D]])
r = np.array([[x0, 0.], [0., y0]])
#### coord sys change
if out_frame != None:
ref_ra = sechdr['CRVAL1']
ref_dec = sechdr['CRVAL2']
ref_coord = SkyCoord(ra=ref_ra * u.degree,\
dec=ref_dec * u.degree, frame=in_frame)
trans_ref_coord = ref_coord.transform_to(out_frame)
sechdr['CRVAL1'] = trans_ref_coord.ra.value
sechdr['CRVAL2'] = trans_ref_coord.dec.value
if 'RADESYS' in sechdr:
sechdr.set('RADESYS', out_frame.upper())
elif 'RADECSYS' in sechdr:
sechdr.set('RADECSYS', out_frame.upper())
ref_xy = np.array([sechdr['CRPIX1'], sechdr['CRPIX2']])
ref_xy_new = (np.dot(r, np.ones(ref_xy.T.shape))\
+ np.dot(CD, ref_xy.T)).swapaxes(-1, -0)
ref_shift_x = ref_xy_new[0] + 1.
ref_shift_y = ref_xy_new[1] + 1.
sechdr['CRPIX1'] = ref_shift_x
sechdr['CRPIX2'] = ref_shift_y
if correctiontype == 'full':
sechdr['CD1_1'] = (-1) * A * 0.2 / 3600.
sechdr['CD1_2'] = C * 0.2 / 3600.
sechdr['CD2_1'] = (-1) * B * 0.2 / 3600.
sechdr['CD2_2'] = D * 0.2 / 3600.
#### flux correction
if correct_flux and len(cube) == 3:
print('using spec path: ' + spec_path)
if AB_zpt != None and Vega_zpt != None:
print('AB zeropoint and Vega zeropoint provided. hst_filter will'
' be overwritten')
aboffset = Vega_zpt - AB_zpt
elif AB_zpt != None and Vega_zpt == None:
print('both AB zeropoint and Vega zeropoint must be provided.')
sys.exit()
elif AB_zpt == None and Vega_zpt != None:
print('both AB zeropoint and Vega zeropoint must be provided.')
sys.exit()
else:
print('HST zeropoints used with STSYNPHOT')
aboffset = ABtoVega(hst_filter[0], hst_filter[1])
speclist = glob.glob(spec_path + '/' + spec_folder + '/specid*')
flux_cor_tab = Table()
cat_mag_list = []
del_mag_list = []
qltflag_list = []
snrspec_list = []
starid_list = []
for i, temp_sp in enumerate(speclist):
spec_hdu = fits.open(temp_sp)
spec_head = spec_hdu[0].header
qltflag = spec_head['HIERARCH SPECTRUM QLTFLAG']
snrspec = spec_head['HIERARCH SPECTRUM FITSNR']
if (_any_bit_in_number(dqs, qltflag) & (snrspec > spec_snr)):
del_mag_list = np.append(del_mag_list,\
-(spec_head['HIERARCH SPECTRUM MAG DELTA'] + 50. + aboffset))
cat_mag_list = np.append(cat_mag_list, spec_head['HIERARCH STAR MAG'])
qltflag_list = np.append(qltflag_list, qltflag)
snrspec_list = np.append(snrspec_list, snrspec)
starid_list = np.append(starid_list, spec_head['HIERARCH STAR ID'])
muse_mag_list = np.array(cat_mag_list) - np.array(del_mag_list)
if len (del_mag_list) == 0:
print()
sys.exit('No sources were slected. Please adjust SNR or make sure any sources are in the Cube'.upper())
flux_cor_tab = Table([starid_list, muse_mag_list, cat_mag_list, del_mag_list, snrspec_list, qltflag_list],
names=('spec_id', 'muse_mag', 'cat_mag', 'dmag', 'spec_snr', 'DQ'))
flux_cor_tab['spec_id'].info.format = '5.0f'
flux_cor_tab['muse_mag'].info.format = '5.2f'
flux_cor_tab['cat_mag'].info.format = '5.2f'
flux_cor_tab['dmag'].info.format = '5.2f'
flux_cor_tab['spec_snr'].info.format = '5.1f'
flux_cor_tab['DQ'].info.format = '3.0f'
clippend_del_mag = sigma_clip(del_mag_list, sigma=2, cenfunc = np.ma.median)
fmultipl = 10 ** ((-1) * 0.4 * np.ma.median(clippend_del_mag))
flux_cor_tab.add_column(np.ma.getmask(clippend_del_mag), name='masked')
flux_cor_tab.sort('spec_id')
flux_cor_tab.write(os.path.join(output_path, 'flux_cor_info.tab'), format='ascii.rst', overwrite=True)
if debug == True:
print(flux_cor_tab)
print()
print('The magnitude difference:')
print('cat - MUSE [mag]: ',\
'{:.2f}'.format(np.ma.median(clippend_del_mag)))
print('sigma cat - MUSE [mag]: ',\
'{:.2f}'.format(np.ma.std(clippend_del_mag)))
print('The flux multiplicator [f_catalog / f_MUSE]: ',\
'{:.2f}'.format(fmultipl))
print('number of sources: ',\
'{:.0f}'.format(clippend_del_mag.count()))
with open(os.path.join(output_path, 'flux_cor_info.tab'),"a+") as f:
f.write("\n")
f.write('cat - MUSE [mag]: ' + '{:.2f}'.format(np.ma.median(clippend_del_mag)) + '\n')
f.write('sigma cat - MUSE [mag]: ' + '{:.2f}'.format(np.ma.std(clippend_del_mag)) + '\n')
f.write('The flux multiplicator [f_catalog / f_MUSE]: ' + '{:.2f}'.format(fmultipl) + '\n')
f.write('N sources: ' + '{:.0f}'.format(clippend_del_mag.count()))
cube['DATA'].data *= fmultipl
cube['STAT'].data *= fmultipl ** 2
if save_output == True:
if output_file == None:
offset[0].header['HIERARCH PAMPELMUSE global prefix'] = offset_input[:-9] + '_cor'
offset.writeto(offset_path + '/' + offset_input[:-9] + '_cor.prm.fits', overwrite=True)
else:
shutil.copyfile(offset_path + '/' + offset_input,\
offset_path + '/' + output_file)
if offset_type == 'eso':
obs = cube[0].header['MJD-OBS']
indobs = np.where(obs == offset[1].data['MJD_OBS'])
dRA = offset[1].data['RA_OFFSET'][indobs]
dDEC = offset[1].data['DEC_OFFSET'][indobs]
fscale = offset[1].data['FLUX_SCALE'][indobs]
print('The RA offset is [arcsec]: ',\
'{:.4f}'.format(dRA[0] * 3600.))
print('The Dec offset is [arcsec]: ',\
'{:.4f}'.format(dDEC[0] * 3600.))
print('The flux scale is: ',\
'{:.4f}'.format(fscale[0]))
cube[1].header['CRVAL1'] -= dRA[0]
cube[1].header['CRVAL2'] -= dDEC[0]
if correct_flux and not np.isnan(fscale[0]):
cube[1].data *= fscale[0]
if save_output==True:
if output_path == None:
if output_file == None:
cube.writeto(path + '/' + input_fits + '_cor.fits', overwrite=True)
else:
cube.writeto(path + '/' + output_file + '.fits', overwrite=True)
else:
if output_file == None:
cube.writeto(os.path.join(output_path, input_fits + '_cor.fits'), overwrite=True)
else:
cube.writeto(os.path.join(output_path, output_file ), overwrite=True)
[docs]
def pampelmuse_cat(ra, dec, mag, filter, idx=None, path=None,
sat=0., mag_sat=None, ifs_sat=None, mag_limit=None,
regsize=0.5):
'''
This modules uses input parameters to create a catalog that is compatible
with pampelmuse
Args:
ra : :obj:`float`
RA coordinates of the stars
dec : :obj:`float`
Dec coordinates of the stars
mag : :obj:`float`
magnitudes of the stars
filter : :obj:`str`
filter used for the magnitudes
Kwargs:
idx : :obj:`float` (optional, default : counting up from 1)
catalog index of the stars
sat : :obj`float` (optional, default: 0)
value assigned to saturated sources in the catalog
mag_sat : :obj:`float` (optional, default: :obj:`None`)
magnitude that replaces saturated sources in the catalog
path : :obj:`str` (optional, default: current directory)
path of the output file
mag_limit : :obj:`float` (optional, default: :obj:`None`)
the magnitude at which the output catalog should be truncated
regsize : :obj:`float` (optional, default: :float:0.5)
the size of the regions in arcsec
'''
if not path:
path = os.getcwd()
if not idx.all():
id = np.arange(len(ra)) + 1
else:
id = idx
if mag_limit:
mag_lim_id = np.where(mag <= mag_limit)
id = id[mag_lim_id]
ra = ra[mag_lim_id]
dec = dec[mag_lim_id]
mag = mag[mag_lim_id]
if mag_sat:
sat_source = np.where(mag == sat)
if ifs_sat:
ifs_sat_id = id[np.where((mag < ifs_sat) & (mag != sat))]
f_ifs_sat_id = open(path + '/ifs_sat_id.list', 'w')
f_ifs_sat_id.write('[')
for i in ifs_sat_id:
f_ifs_sat_id.write(str(i) + ',')
f_ifs_sat_id.write(']')
f_ifs_sat_id.close()
tab = Table([id, ra, dec, mag], names=('id', 'ra', 'dec',\
str(filter).lower()), dtype=('i4', 'f8', 'f8', 'f8'))
if mag_sat != None:
tab[str(filter).lower()][sat_source] = mag_sat
tab.write(path + '/' + str(filter).upper(),\
format='ascii.basic', delimiter=',', overwrite=True)
#write ds9 regions file
regf = open(path + '/' + str(filter).upper() + '.reg', 'w')
regf.write('global color=green dashlist=8 3 width=2 font="helvetica 10 normal roman" select=1 highlite=1 dash=0 fixed=0 edit=1 move=0 delete=1 include=1 source=1\n')
regf.write("fk5\n")
for rai, deci in zip(tab['ra'], tab['dec']):
regf.write("circle(" + str(rai) + ", " + str(deci) + ', '
+ '{:.1f}'.format(regsize) + '")\n')
regf.close()
[docs]
def linemaps(input_fits, path=None, elements=None, wavelengths=None):
'''
This module is intended to create linemaps of specified lines/elements
Args:
input_fits : :obj:`str`
The fully reduced datacube Pampelmuse has been run on
Kwargs:
path : :obj:`str` (optional, default: current directory)
I/O path
element : obj:`list` (optional)
list of elements the linemaps shall be produced
wavelength : :obj:`list` (optional)
list of wavelength for givene elements, optional,
must be given if `elements` is given
'''
if path == None:
path = os.getcwd()
#predefined elements and their wavelength
if elements == None:
wavelengths = [6562.80, 6716.47, 5006.84, 6583.41]
elements = ['Ha', 'SII_6716', 'OIII_5007', 'NII_6583']
if elements != None and wavelengths.any() == None:
print('Error: element but no wavelength given')
sys.exit()
if os.path.exists(path + 'temp/') == False:
os.mkdir(path + 'temp/')
cube = SpectralCube.read(path + '/' + input_fits, hdu=1, format='fits')
for wavelength, element in zip(wavelengths, elements):
slab = cube.spectral_slab((wavelength - 3) * u.AA,\
(wavelength + 3) * u.AA).sum(axis=0)
slab.hdu.writeto(path + '/' + element + '_' +input_fits, overwrite=True)
shutil.rmtree(path + 'temp/')
[docs]
def mosaics(input_list, name, path=None):
'''
This module is intended to create mosaics of specified lines/elements.
linemaps should have been created beforehand using the ``linemaps`` module
Args:
input_list : :obj:`list`
The list of specific linemaps to be used to mosaic
name : :obj:`str`
Name of the created mosaic
Kwargs:
path: :obj:`str` (optional, default: current directory)
I/O path
'''
if path == None:
path = os.getcwd()
if os.path.exists(path + '/temp/') == False:
os.mkdir(path + '/temp/')
if os.path.exists(path + '/mosaics/') == False:
os.mkdir(path + '/mosaics/')
for idx, f in enumerate(input_list):
shutil.copy(f, path + '/temp/' + str(idx) + '.fits')
montage.mosaic(path + '/temp/', path + '/mosaic_temp/',\
background_match=True, exact_size=True, cleanup=True)
shutil.copy(path + '/mosaic_temp/mosaic_area.fits',\
path + '/mosaics/exp_' + name + '.fits')
shutil.copy(path + '/mosaic_temp/mosaic.fits',\
path + '/mosaics/' + name + '.fits')
shutil.rmtree(path + '/mosaic_temp/')
shutil.rmtree(path + '/temp/')