#!/usr/bin/env python
__version__ = '0.2.1'
__revision__ = '20250606'
import sys
import os
import shutil
import warnings
import pyspeckit
import numpy as np
from astropy.io import fits
from astropy.io import ascii
from astropy.table import Table
from astropy.table import Column
from astropy import units as u
from astropy import constants as const
from astropy import log
from astropy.stats import median_absolute_deviation as MAD
import pandas as pd
import time
import logging
import stsynphot as stsyn
import synphot as syn
from specutils import Spectrum1D
from scipy.ndimage.filters import gaussian_filter
from scipy.signal import find_peaks
from scipy.special import wofz
from itertools import combinations as inter_comb
[docs]
def initial_guesses(self, lines, blends=None, linestrength=100.,\
llimits=[2. * (-1), 2.]):
'''
Creates the initial guesses for the line fitter
Args:
lines : :func:`numpy.array`
central wavelengths of the spectral lines
Kwargs:
linestrength : :obj:`float` (default: 100)
initial guess for the line strength
blends : :obj:`str` (optional)
A file containing the a list of blended lines in the
format: ** List is coming soon**
return:
guesses : :obj:`list`
lists containing the guesses
limits : :obj:`list`
lists containing the limits
limited : :obj:`list`
lists containing the limited
'''
if self.linetype == 'absorption':
linestrength = linestrength * (-1)
guesses = np.zeros(4 * len(lines))
limits = []
limited = []
for idx, line in enumerate(lines):
if self.rv_sys != 0.:
line = lambda_rv_shift(self, line)
guesses[4 * idx + 0] = linestrength
guesses[4 * idx + 1] = line
guesses[4 * idx + 2] = self.dispersion
guesses[4 * idx + 3] = self.dispersion
limits.append((0, 0)) #Amplitude
if self.linetype == 'absorption':
limited.append((False, True)) #Amplitude
if self.linetype == 'emission':
limited.append((True, False)) #Amplitude
if self.linetype == 'both':
limited.append((False, False)) #Amplitude
limits.append((line + llimits[0], line + llimits[1]))
limited.append((True, True)) #Center
limits.append((0, 0)) #width (gausssigma)
limited.append((True, False)) #width
limits.append((0, 0)) #width (lorentzsigma)
limited.append((True, False)) #width
if blends:
blendlist = ascii.read(blends)
for blendidx in range(len(blendlist)):
# lprime = blendlist[blendidx]['lprime']
# lsec = blendlist[blendidx]['lsec']
if self.rv_sys == 0.:
lprime = blendlist[blendidx]['lprime']
lsec = blendlist[blendidx]['lsec']
else:
lprime = lambda_rv_shift(self,
blendlist[blendidx]['lprime'])
lsec = lambda_rv_shift(self,
blendlist[blendidx]['lsec'])
primeidx = np.where(lprime == guesses)[0]
secidx = np.where(lsec == guesses)[0]
if len(primeidx) == 1:
self.logger.info('Blend detected: '\
+ str(blendlist[blendidx]['prime'])\
+ ' and ' + str(blendlist[blendidx]['sec']) + ' with ratio '\
+ str(blendlist[blendidx]['ratio']))
guesses[secidx - 1] = blendlist[blendidx]['ratio']\
* guesses[primeidx - 1]
if lprime < lsec and (lprime + llimits[1]) > lsec + llimits[0]:
limits[secidx[0]]\
= (limits[primeidx[0]][1] + 0.01, limits[secidx[0]][1])
if limits[primeidx[0]][1] + 0.01 >= lsec:
limits[secidx[0]] = (lsec - 0.5, limits[secidx[0]][1])
if lprime > lsec and (lprime + llimits[0]) < lsec + llimits[1]:
limits[secidx[0]] = (limits[secidx[0]][0],\
limits[primeidx[0]][0] - 0.01)
if limits[primeidx[0]][0] - 0.01 <= lsec:
limits[secidx[0]] = (limits[secidx[0]][0], lsec + 0.5)
return guesses, limits, limited
[docs]
def update_parinfo(self, guesses, llimits, line_idx, blends,
parinfo, autoadjust, fwhm_block):
r"""
Updates the parinfo file, created by pyspeckit.
Args:
guesses : :func:`numpy.array`
The initial guesses for the the radial velocity fit guesses in
the form [RV,sepctral_dispersion]
llimits : :obj:`list`
the limits for the wavelength fit as set in ``ppxf``
line_idx : :obj:`str`
Name of the primary line
blends : :obj:`ascii`-file or :obj:`None`
A file with primary lines that contain blends to provide a maximum
amplitude ratio of the primary and the blend to prevent that the
blend becomes the dominant line in the fit.
parinfo: :obj:`dict`
the parinfo file created by pyspeckit, which contains the fitted
parameters for all input lines
autoadjust : :obj:`bool`
:obj:`True`: the wavelength limits ``llimit`` will be adjusted to
the fit of the previous iteration. All other wavelength range are
adjusted accordingly taking into account the proper velocity
corrected shift :math:`\Delta \lambda/\lambda`. This is especially
important to detect hyper-velocity stars.
:obj:`False`: no adjustment to the limits done
fwhm_block : :obj:`bool:obj:`
:obj:`True`: The minimum fwhm of the voigt profiles of the fitted
lines is the instrument's dispersion
:obj:`False`: The minimum fwhm of the voigt profiles of the fitted
lines is zero
"""
if self.rv_sys == 0.:
lprime = self.cat.loc[line_idx, 'l_lab']
primeidx = np.where(lprime == guesses)[0]
else:
lprime = lambda_rv_shift(self, self.cat.loc[line_idx, 'l_lab'])
primeidx = np.where(lprime == guesses)[0]
if autoadjust:
lshift_in = parinfo[int(primeidx[0])]['value']\
- guesses[int(primeidx[0])]
for adj_idx in range(int(len(guesses) / 4.)):
lshift = _lambda_shift(lshift_in, guesses[int(primeidx[0])],\
guesses[int(4 * adj_idx + 1)])
parinfo[int(4 * adj_idx + 1)]['limits']\
= (guesses[int(4 * adj_idx + 1)] + llimits[0]\
+ lshift, guesses[int(4 * adj_idx + 1)]\
+ llimits[1] + lshift)
parinfo[int(4 * adj_idx + 1)]['value']\
= guesses[int(4 * adj_idx + 1)] + lshift
#### check that fwhm is always > dispersion:
if fwhm_block:
for paridx in range(int(len(guesses) / 4.)):
fwhm_g, fwhm_l, fwhm_v\
= voigt_FWHM(np.array([parinfo[int(4 * paridx + 2)]['value']],\
dtype=float), np.array([parinfo[int(4 * paridx + 3)]['value']],\
dtype=float))
if fwhm_v[0] < self.dispersion:
target_fwhm_g = self.dispersion - fwhm_l[0]
s = target_fwhm_g / 2. / np.sqrt(2. * np.log(2.))
parinfo[int(4 * paridx + 2)]['limits']\
= (s, parinfo[int(4 * paridx + 2)]['limits'][1])
parinfo[int(4 * paridx + 2)]['value'] = s
if blends:
blendlist = ascii.read(blends)
for blendidx in range(len(blendlist)):
blendration = blendlist[blendidx]['ratio']
lsec = blendlist[blendidx]['lsec']
secidx = np.where(lsec == guesses)[0]
if len(secidx) == 1:
prime_profile = voigt_funct(self.spec_lambda_highres,\
parinfo[int(primeidx[0])]['value'],\
parinfo[int(primeidx[0] - 1)]['value'],\
parinfo[int(primeidx[0] + 1)]['value'],\
parinfo[int(primeidx[0] + 2)]['value'])
sec_profile = voigt_funct(self.spec_lambda_highres,\
parinfo[int(secidx[0])]['value'],\
parinfo[int(secidx[0] - 1)]['value'],\
parinfo[int(secidx[0] + 1)]['value'],\
parinfo[int(secidx[0] + 2)]['value'])
ind_max_prime = np.argmax(np.abs(prime_profile))
ind_max_sec = np.argmax(np.abs(sec_profile))
if np.max(np.abs(sec_profile)) >= \
blendration * np.max(np.abs(prime_profile)):
# if parinfo[int(secidx[0] - 1)]['value'] >= \
# blendration * parinfo[int(primeidx[0] - 1)]['value']:
parinfo[int(secidx[0] - 1)]['limited'] = (True, True)
parinfo[int(secidx[0] - 1)]['limits'] \
= (blendration * parinfo[int(primeidx[0] - 1)]['value'], \
parinfo[int(secidx[0] - 1)]['limits'][1])
parinfo[int(secidx[0] - 1)]['value']\
= 0.99 * blendration * prime_profile[ind_max_prime]
# = 0.99 * blendration\
# * parinfo[int(primeidx[0] - 1)]['value']
if parinfo[int(primeidx[0] - 1)]['value'] == 0.:
parinfo[int(secidx[0] - 1)]['limits'] \
= (0.01 * (-1), \
parinfo[int(secidx[0] - 1)]['limits'][1])
parinfo[int(secidx[0] - 1)]['value']\
= blendration * (0.01) * (-1.)
return parinfo
def plotcolor(n):
colarray = np.array(['Lime', 'Blue', 'Magenta', 'Olive', 'Maroon',
'indigo', 'orange', 'Cyan', 'Yellow', 'Silver'])
colselect = colarray[n % len(colarray)]
return colselect
def baseline_extract(self, specfit, poly_order):
coeff = np.zeros(poly_order + 1)
power = np.arange(poly_order + 1)
for order in range(poly_order + 1):
coeff[order] = specfit.header['BLCOEF' + str('{:02d}'.format(order))]
baseline = np.poly1d(coeff / (self.specbinsize ** power[::-1]))
return baseline
def voigt_FWHM(sigma_g, gamma_l):
#calculating the FWHM of the voigt, gauss, and lorentz profile
c_0 = 2.0056
c_1 = 1.0593
fwhm_g = 2. * np.sqrt(2. * np.log(2.)) * sigma_g
fwhm_l = 2. * gamma_l
phi = fwhm_l / fwhm_g
fwhm_v = np.zeros_like(sigma_g, dtype=float)
for i in range(len(fwhm_g)):
if fwhm_g[i] == 0:
fwhm_v[i] = fwhm_l[i]
else:
fwhm_v[i] = fwhm_g[i] * (1. - c_0 * c_1 + np.sqrt(phi[i] ** 2\
+ 2. * c_1 * phi[i] + (c_0 * c_1) ** 2))
return fwhm_g, fwhm_l, fwhm_v
def voigt_funct(x_array, x_cen, amplitude, sigma, gamma):
if sigma > 0 and gamma > 0:
z = ((x_array - x_cen) + 1j * gamma) / (sigma * np.sqrt(2.))
y_arr = amplitude * np.real(wofz(z))
if sigma == 0:
y_arr = _lorentzian(x_array, x_cen, amplitude, gamma)
if gamma == 0:
y_arr = _gaussian(x_array, x_cen, amplitude, sigma)
return y_arr
def spec_res_downgrade(l_in, spec_in, l_out):
templ_spec = syn.SourceSpectrum(syn.Empirical1D, points=l_in, lookup_table=spec_in, keep_neg=True)
white_filter = syn.SpectralElement(syn.models.Empirical1D, points=templ_spec.waveset,
lookup_table=np.ones(len(templ_spec.waveset)), keep_neg=True)
obs_convolved = syn.Observation(templ_spec, white_filter, force='taper', binset=l_out)
convolved_spec = obs_convolved.as_spectrum(binned=True)
#
# convolved_spec = Spectrum1D(spectral_axis=obs_convolved.waveset,
# flux=syn.units.convert_flux(obs_convolved.waveset,
# fluxes=obs_convolved(obs_convolved.waveset)))
return convolved_spec(convolved_spec.waveset).value
def line_clipping(self, x, line_significants, sigma=3):
mask = np.zeros_like(x, dtype=int)
ind_sig = np.where((self.cat.loc[:,\
'significance'].values.astype(np.float64) < line_significants)\
| (self.cat.loc[:, 'used'].values == 'f'))
mask[ind_sig] = 1
x_red1 = np.delete(x, np.where(mask == 1))
if len(x_red1) >= 3:
if len(x_red1) == 3:
median = np.median(x_red1)
mad = MAD(x_red1)
ind = np.where((x < median - sigma * mad) | (x > median\
+ sigma * mad))[0]
if len(x) > 3:
gr = np.array(list(inter_comb(x_red1, len(x_red1) - 2)))
std_gr = [np.std(it) for it in gr]
x_red = gr[np.argmin(std_gr)]
median = np.median(x_red)
mad = MAD(x_red)
ind = np.where((x < median - sigma * mad) | (x > median\
+ sigma * mad))[0]
mask[ind] = 1
x_masked = np.ma.masked_array(x, mask=[mask])
return x_masked
def clipped_MAD(x):
if len(x) <= 3:
mad = MAD(x)
if len(x) > 3:
x = np.delete(x, [np.argmin(x), np.argmax(x)])
mad = MAD(x)
return mad
def continuum_deviation(self, l_in, f_in, baseline, contorder):
if self.linetype == 'absorption':
peaks = find_peaks(f_in * (-1), width=2)[0]
else:
peaks = find_peaks(f_in, width=2)[0]
mask = np.zeros_like(f_in)
for p in peaks:
mask[p - 3:p + 4] = 1
remove = ~np.ma.array(l_in, mask=mask).mask
l_masked = l_in[remove]
f_masked = f_in[remove]
baseline_masked = baseline[remove]
masked_fit = np.polyfit(l_masked, f_masked, contorder)
masked_fitfunc = np.poly1d(masked_fit)
cont_dev = MAD(f_masked / np.median(f_masked)
- baseline_masked / np.median(baseline_masked))
return cont_dev
def ABtoVega(instrument, bandpass):
bp = stsyn.band(str(instrument) + ',wfc1,' + str(bandpass) + ',mjd#57754')
obs = syn.Observation(stsyn.Vega, bp, binset=bp.binset)
photflam = bp.unit_response(stsyn.conf.area)
photplam = bp.pivot()
zp_vega = (-1) * obs.effstim(flux_unit='obmag', area=stsyn.conf.area).value
zp_ab = (-2.5 * np.log10(photflam.value) - 21.1
- 5 * np.log10(photplam.value) + 18.6921)
zp_st = -2.5 * np.log10(photflam.value) - 21.1
difference = zp_vega - zp_ab
return difference
def lambda_rv_shift(self, lam):
lambda_new = lam + self.rv_sys * lam / const.c.to('km/s').value
if self.rv_sys == 0:
lambda_new = lam
return lambda_new
def _lambda_shift(dlin, lin, lout):
'''
This functions shifts the wavelength limits if the central wavelength
is shifted. This assures that lambda never runs out of bounds
'''
dlout = dlin * lout / lin
return dlout
def _gaussian(x, mu, A, sigma):
prefact = 1. / np.sqrt(2. * np.pi * sigma ** 2)
exponetial = np.exp((-1.) * ((x - mu) ** 2) / (2. * sigma ** 2))
returnfunction = A * exponetial
return returnfunction
def _lorentzian(x, mu, A, gamma):
returnfunction = A * gamma / np.pi / ((x - mu) ** 2 + gamma ** 2)
return returnfunction
def _any_bit_in_number(arr, num):
for elem in arr:
if elem & num: # Check if any bit is shared
return False
return True