import os
import sys
import json
import numpy as np
import pandas as pd
from copy import deepcopy
from astropy.io import fits
from pandeia.engine.instrument_factory import InstrumentFactory
from pandeia.engine.perform_calculation import perform_calculation
from . import create_input as create
from .compute_noise import ExtractSpec
import astropy.units as u
import pickle
from pandeia.engine.calc_utils import build_default_calc, build_default_source
#constant parameters.. consider putting these into json file
#max groups in integration
max_ngroup = 65536.0
#minimum number of integrations
min_nint_trans = 1
#refdata directory
default_refdata_directory = os.environ.get("pandeia_refdata")
[docs]def compute_full_sim(dictinput,verbose=False):
"""Top level function to set up exoplanet obs. for JW
Function to set up explanet observations for JWST only and
compute simulated spectrum. It uses STScI's Pandeia to compute
instrument throughputs and WebbPSF to compute PSFs.
Parameters
----------
dictinput : dict
dictionary containing instrument parameters and exoplanet specific
parameters. {"pandeia_input":dict1, "pandexo_input":dict1}
verbose : bool
(Optional) prints out check points throughout code
Returns
-------
dict
large dictionary with 1d, 2d simualtions, timing info, instrument info, warnings
Examples
--------
>>> from .pandexo.engine.jwst import compute_full_sim
>>> from .pandexo.engine.justplotit import jwst_1d_spec
>>> a = compute_full_sim({"pandeia_input": pandeiadict, "pandexo_input":exodict})
>>> jwst_1d_spec(a)
.. image:: 1d_spec.png
Notes
-----
It is much easier to run simulations through either **run_online** or **justdoit**. **justdoit** contains functions to create input dictionaries and **run_online** contains web forms to create input dictionaries.
See Also
--------
pandexo.engine.justdoit.run_pandexo : Best function for running pandexo runs
pandexo.engine.run_online : Allows running functions through online interface
"""
pandeia_input = dictinput['pandeia_input']
pandexo_input = dictinput['pandexo_input']
#define the calculation we'll be doing
if pandexo_input['planet']['w_unit'] == 'sec':
calculation = 'phase_spec'
else:
calculation = pandexo_input['calculation'].lower()
#which instrument
instrument = pandeia_input['configuration']['instrument']['instrument']
conf = pandeia_input['configuration']
#if optimize is in the ngroups section, this will throw an error
#so create temp conf with 2 groups
if 'optimize' in str(conf['detector']['ngroup']):
conf_temp = deepcopy(conf)
conf_temp['detector']['ngroup'] = 2
else:
conf_temp = conf
i = InstrumentFactory(config=conf_temp)
#detector parameters
det_pars = i.read_detector_pars()
fullwell = det_pars['fullwell']
rn = det_pars['rn']
mingroups = det_pars['mingroups']
#exposure parameters
exp_pars = i.the_detector.exposure_spec
tframe =exp_pars.tframe
nframe = exp_pars.nframe
nskip = exp_pars.nsample_skip
sat_unit = pandexo_input['observation']['sat_unit']
if sat_unit =='%':
sat_level = pandexo_input['observation']['sat_level']/100.0*fullwell
elif sat_unit =='e':
sat_level = pandexo_input['observation']['sat_level']
else:
raise Exception("Saturation Level Needs Units: % fullwell or Electrons ")
#parameteres needed from exo_input
mag = pandexo_input['star']['mag']
noccultations = pandexo_input['observation']['noccultations']
R = pandexo_input['observation']['R']
noise_floor = pandexo_input['observation']['noise_floor']
#get stellar spectrum and in transit spec
star_spec = create.outTrans(pandexo_input['star'])
#get rstar if user calling from grid
both_spec = create.bothTrans(star_spec, pandexo_input['planet'], star=pandexo_input['star'])
out_spectrum = np.array([both_spec['wave'], both_spec['flux_out_trans']])
#get transit duration from phase curve or from input
if calculation == 'phase_spec':
transit_duration = max(both_spec['time']) - min(both_spec['time'])
else:
#convert to seconds, then remove quantity and convert back to float
transit_duration = float((pandexo_input['planet']['transit_duration']*u.Unit(pandexo_input['planet']['td_unit'])).to(u.second)/u.second)
#amount of exposure time out-of-occultation, as a fraction of in-occ time
try:
expfact_out = pandexo_input['observation']['fraction']
print("WARNING: key input fraction has been replaced with new 'baseline option'. See notebook example")
pandexo_input['observation']['baseline'] = pandexo_input['observation']['fraction']
pandexo_input['observation']['baseline_unit'] ='frac'
except:
if pandexo_input['observation']['baseline_unit'] =='frac':
expfact_out = pandexo_input['observation']['baseline']
elif pandexo_input['observation']['baseline_unit'] =='total':
expfact_out = transit_duration/( pandexo_input['observation']['baseline'] - transit_duration)
elif pandexo_input['observation']['baseline_unit'] =='total_hrs':
expfact_out = transit_duration/( pandexo_input['observation']['baseline']*3600.0 - transit_duration)
else:
raise Exception("Wrong units for baseine: either 'frac' or 'total' or 'total_hrs' accepted")
#add to pandeia input
pandeia_input['scene'][0]['spectrum']['sed']['spectrum'] = out_spectrum
if isinstance(pandeia_input["configuration"]["detector"]["ngroup"], (float,int)):
m = {"ngroup":int(pandeia_input["configuration"]["detector"]["ngroup"]), "tframe":tframe,
"nframe":nframe,"mingroups":mingroups,"nskip":nskip}
else:
#run pandeia once to determine max exposure time per int and get exposure params
if verbose: print("Optimization Reqested: Computing Duty Cycle")
m = {"maxexptime_per_int":compute_maxexptime_per_int(pandeia_input, sat_level) ,
"tframe":tframe,"nframe":nframe,"mingroups":mingroups,"nskip":nskip}
if verbose: print("Finished Duty Cycle Calc")
#calculate all timing info
timing, flags = compute_timing(m,transit_duration,expfact_out,noccultations)
#Simulate out trans and in transit
if verbose: print("Starting Out of Transit Simulation")
out = perform_out(pandeia_input, pandexo_input,timing, both_spec)
#extract extraction area before dict conversion
extraction_area = out.extraction_area
out = out.as_dict()
out.pop('3d')
if verbose: print("End out of Transit")
#Remove effects of Quantum Yield from shot noise
out = remove_QY(out, instrument)
#this kind of redundant going to compute inn from out instead
#keep perform_in but change inputs to (out, timing, both_spec)
if verbose: print("Starting In Transit Simulation")
inn = perform_in(pandeia_input, pandexo_input,timing, both_spec, out, calculation)
if verbose: print("End In Transit")
#compute warning flags for timing info
warnings = add_warnings(out, timing, sat_level/fullwell, flags, instrument)
compNoise = ExtractSpec(inn, out, rn, extraction_area, timing)
#slope method is pandeia's pure noise calculation (taken from SNR)
#contains correlated noise, RN, dark current, sky,
#uses MULTIACCUM formula so we deviated from this.
#could eventually come back to this if Pandeia adopts First-Last formula
if calculation == 'slope method':
#Extract relevant info from pandeia output (1d curves and wavelength)
#extracted flux in units of electron/s
w = out['1d']['extracted_flux'][0]
result = compNoise.run_slope_method()
#derives noise from 2d postage stamps. Doing this results in a higher
#1d flux rate than the Pandeia gets from extracting its own.
#this should be used to benchmark Pandeia's 1d extraction
elif calculation == '2d extract':
w = out['1d']['extracted_flux'][0]
result = compNoise.run_2d_extract()
#this is the noise calculation that PandExo uses online. It derives
#its own calculation of readnoise and does not use MULTIACUMM
#noise formula
elif calculation == 'fml':
w = out['1d']['extracted_flux'][0]
result = compNoise.run_f_minus_l()
elif calculation == 'phase_spec':
result = compNoise.run_phase_spec()
w = result['time']
else:
result = None
raise Exception('WARNING: Calculation method not found.')
varin = result['var_in_1d']
varout = result['var_out_1d']
extracted_flux_out = result['photon_out_1d']
extracted_flux_inn = result['photon_in_1d']
#bin the data according to user input
if R != None:
wbin = bin_wave_to_R(w, R)
photon_out_bin = uniform_tophat_sum(wbin, w,extracted_flux_out)
photon_in_bin = uniform_tophat_sum(wbin,w, extracted_flux_inn)
var_in_bin = uniform_tophat_sum(wbin, w,varin)
var_out_bin = uniform_tophat_sum(wbin,w, varout)
wbin = wbin[photon_out_bin > 0 ]
photon_in_bin = photon_in_bin[photon_out_bin > 0 ]
var_in_bin = var_in_bin[photon_out_bin > 0 ]
var_out_bin = var_out_bin[photon_out_bin > 0 ]
photon_out_bin = photon_out_bin[photon_out_bin>0]
else:
wbin = w
photon_out_bin = extracted_flux_out
wbin = wbin[photon_out_bin>0]
photon_in_bin = extracted_flux_inn
photon_in_bin = photon_in_bin[photon_out_bin>0]
var_in_bin = varin
var_in_bin = var_in_bin[photon_out_bin>0]
var_out_bin = varout
var_out_bin = var_out_bin[photon_out_bin>0]
photon_out_bin = photon_out_bin[photon_out_bin>0]
if calculation == 'phase_spec':
to = (timing["APT: Num Groups per Integration"]-1)*tframe
ti = (timing["APT: Num Groups per Integration"]-1)*tframe
else:
#otherwise error propagation and account for different
#times in transit and out
to = result['on_source_out']
ti = result['on_source_in']
var_tot = (to/ti/photon_out_bin)**2.0 * var_in_bin + (photon_in_bin*to/ti/photon_out_bin**2.0)**2.0 * var_out_bin
error_spec = np.sqrt(var_tot)
#factor in occultations to noise
nocc = timing['Number of Transits']
error_spec = error_spec / np.sqrt(nocc)
#Add in user specified noise floor
error_spec_nfloor = add_noise_floor(noise_floor, wbin, error_spec)
#add in random noise for the simulated spectrum
np.random.seed()
rand_noise= error_spec_nfloor*(np.random.randn(len(wbin)))
raw_spec = (photon_out_bin/to-photon_in_bin/ti)/(photon_out_bin/to)
sim_spec = raw_spec + rand_noise
#if secondary tranist, multiply spectra by -1
if pandexo_input['planet']['f_unit'] == 'fp/f*':
sim_spec = -1.0*sim_spec
raw_spec = -1.0*raw_spec
#package processed data
finalspec = {'wave':wbin,
'spectrum': raw_spec,
'spectrum_w_rand':sim_spec,
'error_w_floor':error_spec_nfloor}
rawstuff = {
'electrons_out':photon_out_bin*nocc,
'electrons_in':photon_in_bin*nocc,
'var_in':var_in_bin*nocc,
'var_out':var_out_bin*nocc,
'e_rate_out':photon_out_bin/to,
'e_rate_in':photon_in_bin/ti,
'wave':wbin,
'error_no_floor':error_spec,
'rn[out,in]':result['rn[out,in]'],
'bkg[out,in]':result['bkg[out,in]']
}
result_dict = as_dict(out,both_spec ,finalspec,
timing, mag, sat_level, warnings,
pandexo_input['planet']['f_unit'], rawstuff,calculation)
return result_dict
[docs]def compute_maxexptime_per_int(pandeia_input, sat_level):
"""Computes optimal maximum exposure time per integration
Function to simulate 2d jwst image with 2 groups, 1 integration, 1 exposure
and return the maximum time
for one integration before saturation occurs. If saturation has
already occured, returns maxexptime_per_int as np.nan. This then
tells Pandexo to set min number of groups (ngroups =2). This avoids
error if saturation occurs. This routine assumes that min ngroups is 2.
Parameters
----------
pandeia_input : dict
pandeia dictionary input
sat_level : int or float
user defined saturation level in units of electrons
Returns
-------
float
Maximum exposure time per integration before specified saturation level
Examples
--------
>>> max_time = compute_maxexptime_per_int(pandeia_input, 50000.0)
>>> print(max_time)
12.0
"""
#run once to get 2d rate image
pandeia_input['configuration']['detector']['ngroup'] = int(2 )
pandeia_input['configuration']['detector']['nint'] = 1
pandeia_input['configuration']['detector']['nexp'] = 1
report = perform_calculation(pandeia_input, dict_report=False)
report_dict = report.as_dict()
# count rate on the detector in e-/second/pixel
#det = report_dict['2d']['detector']
det = report.signal.rate_plus_bg_list[0]['fp_pix']
timeinfo = report_dict['information']['exposure_specification']
#totaltime = timeinfo['tgroup']*timeinfo['ngroup']*timeinfo['nint']
maxdetvalue = np.max(det)
#maximum time before saturation per integration
#based on user specified saturation level
try:
maxexptime_per_int = sat_level/maxdetvalue
except:
maxexptime_per_int = np.nan
return maxexptime_per_int
[docs]def compute_timing(m,transit_duration,expfact_out,noccultations):
"""Computes all timing info for observation
Computes all JWST specific timing info for observation including. Some pertinent
JWST terminology:
- frame: The result of sequentially clocking and digitizing all pixels in a rectangular area of an SCA. **Full-fame readout** means to digitize all pixels in an SCA, including reference pixels. **Frame** also applies to the result of clocking and digitizing a subarray on an SCA.
- group: One or more consecutively read frames. There are no intervening resets. Frames may be averaged to form a group but for exoplanets the read out scheme is always 1 frame = 1 group
- integration: The end result of resetting the detector and then non-destructively sampling it one or more times over a finite period of time before resetting the detector again. This is a unit of data for which signal is proportional to intensity, and it consists of one or more GROUPS.
- exposure: The end result of one or more INTEGRATIONS over a finite period of time. EXPOSURE defines the contents of a single FITS file.
Parameters
---------
m : dict
Dictionary output from **compute_maxexptime_per_int**
transit_duration : float or int
transit duration in seconds
expfact_out : float or int
fraction of time spent in transit versus out of transit
noccultations : int
number of transits
Returns
-------
timing : dict
All timing info
warningflag : dict
Warning flags
Examples
--------
>>> timing, flags = compute_timing(m, 2*60.0*60.0, 1.0, 1.0)
>>> print((list(timing.keys())))
['Number of Transits', 'Num Integrations Out of Transit', 'Num Integrations In Transit',
'APT: Num Groups per Integration', 'Seconds per Frame', 'Observing Efficiency (%)', 'On Source Time(sec)',
'Exposure Time Per Integration (secs)', 'Reset time Plus 30 min TA time (hrs)',
'APT: Num Integrations per Occultation', 'Transit Duration']
"""
tframe = m['tframe']
nframe = m['nframe']
nskip = m['nskip']
mingroups = m['mingroups']
overhead_per_int = tframe #overhead time added per integration
try:
#are we starting with a exposure time ?
maxexptime_per_int = m['maxexptime_per_int']
except:
#or a pre defined number of groups specified by user
ngroups_per_int = m['ngroup']
flag_default = "All good"
flag_high = "All good"
if 'maxexptime_per_int' in locals():
#Frist, if maxexptime_per_int has been defined (from above), compute ngroups_per_int
#number of frames in one integration is the maximum time beofre exposure
#divided by the time it takes for one frame. Note this does not include
#reset frames
nframes_per_int = np.floor(maxexptime_per_int/tframe)
#for exoplanets nframe =1 an nskip always = 0 so ngroups_per_int
#and nframes_per_int area always the same
ngroups_per_int = np.floor(nframes_per_int/(nframe + nskip))
#put restriction on number of groups
#there is a hard limit to the maximum number groups.
#if you exceed that limit, set it to the maximum value instead.
#also set another check for saturation
if ngroups_per_int > max_ngroup:
ngroups_per_int = max_ngroup
flag_high = "Groups/int > max num of allowed groups"
if (ngroups_per_int < mingroups) | np.isnan(ngroups_per_int):
ngroups_per_int = mingroups
nframes_per_int = mingroups
flag_default = "NGROUPS<"+str(mingroups)+"SET TO NGROUPS="+str(mingroups)
elif 'ngroups_per_int' in locals():
#if it maxexptime_per_int been defined then set nframes per int
nframes_per_int = ngroups_per_int*(nframe+nskip)
#if that didn't work its because maxexptime_per_int is nan .. run calc with mingroups
else:
#if maxexptime_per_int is nan then just ngroups and nframe to 2
#for the sake of not returning error
ngroups_per_int = mingroups
nframes_per_int = mingroups
flag_default = "Something went wrong. SET TO NGROUPS="+str(mingroups)
#the integration time is related to the number of groups and the time of each
#group
exptime_per_int = ngroups_per_int*tframe
#clock time includes the reset frame
clocktime_per_int = (ngroups_per_int+1.0)*tframe
#observing efficiency (i.e. what percentage of total time is spent on soure)
eff = (ngroups_per_int - 1.0)/(ngroups_per_int + 1.0)
#this says "per occultation" but this is just the in transit frames.. See below
# transit duration / ((ngroups + reset)*frame time)
nint_per_occultation = transit_duration/((ngroups_per_int+1.0)*tframe)
#figure out how many integrations are in transit and how many are out of transit
nint_in = np.ceil(nint_per_occultation)
nint_out = np.ceil(nint_in/expfact_out)
#you would never want a single integration in transit.
#here we assume that for very dim things, you would want at least
#3 integrations in transit
if nint_in < min_nint_trans:
ngroups_per_int = np.floor(ngroups_per_int/min_nint_trans)
exptime_per_int = (ngroups_per_int)*tframe
clocktime_per_int = ngroups_per_int*tframe
eff = (ngroups_per_int - 1.0)/(ngroups_per_int + 1.0)
nint_per_occultation = transit_duration/((ngroups_per_int+1.0)*tframe)
nint_in = np.ceil(nint_per_occultation)
nint_out = np.ceil(nint_in/expfact_out)
if nint_out < min_nint_trans:
nint_out = min_nint_trans
timing = {
"Transit Duration" : (transit_duration)/60.0/60.0,
"Seconds per Frame" : tframe,
"Time/Integration incl reset (sec)":clocktime_per_int,
"APT: Num Groups per Integration" :int(ngroups_per_int),
"Num Integrations Out of Transit":int(nint_out),
"Num Integrations In Transit":int(nint_in),
"APT: Num Integrations per Occultation":int(nint_out+nint_in),
"Observing Efficiency (%)": eff*100.0,
"Transit+Baseline, no overhead (hrs)": (nint_out+nint_in)*clocktime_per_int/60.0/60.0,
"Number of Transits": noccultations
}
return timing, {'flag_default':flag_default,'flag_high':flag_high}
[docs]def remove_QY(pandeia_dict, instrument):
"""Removes Quantum Yield from Pandeia Fluxes. Place Holder.
Parameters
----------
pandeia_dict : dict
pandeia output dictionary
instrument : str
instrument running
Returns
-------
dict
same exact dictionary with extracted_flux = extracted_flux/QY
"""
if instrument == 'niriss':
try:
qy = fits.open(os.path.join(default_refdata_directory,'jwst', instrument,'qe' ,'jwst_niriss_h2rg_qe_20221003172003.fits'))
except:
raise Exception('PANDEIA REFERENCE DATA NEEDS TO BE UPDATED')
x_grid = pandeia_dict['1d']['extracted_flux'][0]
qy_on_grid = np.interp(x_grid, qy[1].data['WAVELENGTH'], qy[1].data['CONVERSION'])
elif instrument == 'nirspec':
try:
qy = fits.open(os.path.join(default_refdata_directory,'jwst', instrument,'qe' ,'jwst_nirspec_qe_20160902193401.fits'))
except:
raise Exception('PANDEIA REFERENCE DATA NEEDS TO BE UPDATED')
x_grid = pandeia_dict['1d']['extracted_flux'][0]
qy_on_grid = np.interp(x_grid, qy[1].data['WAVELENGTH'], qy[1].data['CONVERSION'])
else:
#nircam and miri currently have no qy effects
qy_on_grid = 1.0
pandeia_dict['1d']['extracted_flux'][1] = pandeia_dict['1d']['extracted_flux'][1]/qy_on_grid
return pandeia_dict
[docs]def add_warnings(pand_dict, timing, sat_level, flags,instrument):
"""Add warnings for front end
Adds in necessary warning flags for a JWST observation usually associated with
too few or too many groups or saturation. Alerts user if saturation level is higher
than 80 percent and if the number of groups is less than 5. Or, if the full well is
greater than 80. These warnings are currently very arbitrary. Will be updated as
better JWST recommendations are made.
Parameters
----------
pand_dict :
output from pandeia run
timing : dict
output from **compute_timing**
sat_level : int or float
user specified saturation level in fractional (00/100)
flags : dict
warning flags taken from output of **compute_timing**
instrument : str
Only allowable strings are: "nirspec", "niriss", "nircam", "miri"
Returns
-------
dict
all warnings
Notes
-----
These are warnings are just suggestions and are not yet required.
"""
ngroups_per_int = timing['APT: Num Groups per Integration']
#check for saturation
try:
flag_nonl = pand_dict['warnings']['partial_saturated']
except:
flag_nonl = "All good"
try:
flag_sat = pand_dict['warnings']['full_saturated']
except:
flag_sat = "All good"
#check for too small number of groups
flag_low = "All good"
flag_perc = "All good"
if (sat_level > .80) & (ngroups_per_int <3):
flag_low = "% full well>80% & only " + str(ngroups_per_int) + " groups"
if (sat_level > .80):
flag_perc = "% full well>80%"
warnings = {
"Group Number Too Low?" : flag_low,
"Group Number Too High?": flags["flag_high"],
"Non linear?" : flag_nonl,
"Saturated?" : flag_sat,
"% full well high?": flag_perc,
"Num Groups Reset?": flags["flag_default"]
}
return warnings
[docs]def add_noise_floor(noise_floor, wave_bin, error_spec):
"""Add in noise floor
This adds in a user speficied noise floor. Does not add the noise floor in quadrature
isntead it sets error[error<noise_floor] = noise_floor. If a wavelength dependent
noise floor is given and the wavelength ranges are off, it interpolates the out of
range noise floor.
Parameters
----------
noise_floor : str or int
file with two column [wavelength, noise(ppm)] or single number with constant noise floor in ppm
wave_bin : array of float
final binned wavelength grid from simulation
error_spec : array of float
final computed error on the planet spectrum in units of rp^2/r*^2 or fp/f*
Returns
-------
array of float
error_spec-- new error
Examples
--------
>>> import numpy as np
>>> wave = np.linspace(1,2.7,10)
>>> error = np.zeros(10)+1e-6
>>> newerror = add_noise_floor(20, wave, error)
>>> print(newerror)
[ 2.00000000e-05 2.00000000e-05 2.00000000e-05 2.00000000e-05
2.00000000e-05 2.00000000e-05 2.00000000e-05 2.00000000e-05
2.00000000e-05 2.00000000e-05]
"""
#add user specified noise floor
if (type(noise_floor)==float) | (type(noise_floor) == int):
error_spec[error_spec<noise_floor*1e-6] = noise_floor*1e-6
elif (type(noise_floor)==str):
read_noise = np.genfromtxt(noise_floor, dtype=(float, float), names='w, n')
w_overlap = (wave_bin>=min(read_noise['w'])) & (wave_bin<=max(read_noise['w']))
wnoise = wave_bin[w_overlap]
noise = np.zeros(len(wave_bin))
noise[w_overlap] = np.interp(wnoise , read_noise['w'], read_noise['n'])
noise[(wave_bin>max(read_noise['w']))] = read_noise['n'][read_noise['w'] == max(read_noise['w'])]
noise[(wave_bin<min(read_noise['w']))] = read_noise['n'][read_noise['w'] == min(read_noise['w'])]
error_spec[error_spec<noise*1e-6] = noise[error_spec<noise*1e-6]*1e-6
else:
raise ValueError('Noise Floor added was not integer or file')
return error_spec
[docs]def bin_wave_to_R(w, R):
"""Creates new wavelength axis at specified resolution
Parameters
----------
w : list of float or numpy array of float
Wavelength axis to be rebinned
R : float or int
Resolution to bin axis to
Returns
-------
list of float
New wavelength axis at specified resolution
Examples
--------
>>> newwave = bin_wave_to_R(np.linspace(1,2,1000), 10)
>>> print((len(newwave)))
11
"""
wave = []
tracker = min(w)
i = 1
ind= 0
firsttime = True
while(tracker<max(w)):
if i <len(w)-1:
dlambda = w[i]-w[ind]
newR = w[i]/dlambda
if (newR < R) & (firsttime):
tracker = w[ind]
wave += [tracker]
ind += 1
i += 1
firsttime = True
elif newR < R:
tracker = w[ind]+dlambda/2.0
wave +=[tracker]
ind = (np.abs(w-tracker)).argmin()
i = ind+1
firsttime = True
else:
firsttime = False
i+=1
else:
tracker = max(w)
wave += [tracker]
return np.array(wave)
[docs]def target_acq(instrument, both_spec, warning):
"""Contains functionality to compute optimal TA strategy
Takes pandexo normalized flux from create_input and checks for saturation, or
if SNR is below the minimum requirement for each. Then adds warnings and 2d displays
and target acq info to final output dict
Parameters
----------
instrument : str
possible options are niriss, nirspec, miri and nircam
both_spec : dict
output dictionary from **create_input**
warning : dict
output dictionary from **add_warnings**
Retruns
-------
"""
out_spectrum = np.array([both_spec['wave'], both_spec['flux_out_trans']])
#this automatically builds a default calculation
#I got reasonable answers for everything so all you should need to do here is swap out (instrument = 'niriss', 'nirspec','miri' or 'nircam')
c = build_default_calc(telescope='jwst', instrument=instrument, mode='target_acq', method='taphot')
c['scene'][0]['spectrum']['sed'] = {'sed_type':'input','spectrum':out_spectrum}
c['scene'][0]['spectrum']['normalization']['type'] = 'none'
rphot = perform_calculation(c, dict_report=True)
#check warnings (pandeia doesn't return values for these warnings, so try will fail if all good)
try:
warnings['TA Satruated?'] = rphot['warnings']['saturated']
except:
warnings['TA Satruated?'] = 'All good'
try:
warnings['TA SNR Threshold'] = rphot['warnings']['ta_snr_threshold']
except:
warnings['TA SNR Threshold'] = 'All good'
#build TA dict
ta = {'sn':rphot['scalar']['sn'],
'ngroup': rphot['input']['configuration']['detector']['ngroup'],
'saturation':rphot['2d']['saturation']}
[docs]def as_dict(out, both_spec ,binned, timing, mag, sat_level, warnings, punit, unbinned,calculation):
"""Format dictionary for output data
Takes all output from jwst run and converts it to simple dictionary
Parameters
----------
out : dict
output dictionary from **compute_out**
both_spec : dict
output dictionary from **createInput.bothTrans**
binned : dict
dictionary from **wrapper**
timing : dict
dictionary from **compute_timing**
mag : dict
magnitude of system
sat_level : float or int
saturation level in electrons
warnings : dict
warning dictionary from **add_warnings**
punit : "fp/f*" or "rp^2/r*^2"
unit of supplied spectra options are: only options are fp/f* or rp^2/r*^2
unbinned : dict
unbinned raw data from **wrapper**
calculation : str
noise calculation type
Returns
-------
dict
compressed dictionary
"""
#for emission spectrum
p=1.0
if punit == 'fp/f*': p = -1.0
timing_div = pd.DataFrame.from_dict(timing, orient='index')
timing_div.columns = ['Value']
timing_div = timing_div.to_html()
timing_div = '<table class="table table-striped"> \n' + timing_div[36:len(timing_div)]
timing_div = timing_div.encode()
warnings_div = pd.DataFrame.from_dict(warnings, orient='index')
warnings_div.columns = ['Value']
warnings_div = warnings_div.to_html()
warnings_div = '<table class="table table-striped"> \n' + warnings_div[36:len(warnings_div)]
warnings_div = warnings_div.encode()
input_dict = {
"Target Mag": mag ,
"Saturation Level (electons)": sat_level,
"Instrument": out['input']['configuration']['instrument']['instrument'],
"Mode": out['input']['configuration']['instrument']['mode'],
"Aperture": out['input']['configuration']['instrument']['aperture'],
"Disperser": out['input']['configuration']['instrument']['disperser'],
"Subarray": out['input']['configuration']['detector']['subarray'],
"Readmode": out['input']['configuration']['detector']['readmode'],
"Filter": out['input']['configuration']['instrument']['filter'],
"Primary/Secondary": punit
}
input_div = pd.DataFrame.from_dict(input_dict, orient='index')
input_div.columns = ['Value']
input_div = input_div.to_html()
input_div = '<table class="table table-striped"> \n' + input_div[36:len(input_div)]
input_div = input_div.encode()
#add calc type to input dict (doing it here so it doesn't output on webpage
input_dict["Calculation Type"]= calculation
final_dict = {
'OriginalInput': {'model_spec':both_spec['model_spec'],
'model_wave' : both_spec['model_wave'],
'star_spec': both_spec['flux_out_trans']},
'RawData': unbinned,
'FinalSpectrum': binned,
#pic output
'PandeiaOutTrans': out,
#all timing info
'timing': timing,
'warning':warnings,
'input':input_dict,
#divs for html rendering
'timing_div':timing_div,
'input_div':input_div,
'warnings_div':warnings_div,
}
return final_dict