Source code for pyatomdb.spectrum


"""
The modules is separated by the type of spectrum you wish to model. For
now, the CIE, NEI and PShock models are implemented.

Roughly speaking, you load a [Modelname]Session, and you can then
obtain:

   linelist :
     list of lines in a certain wavelength interval)

   line_emissivity :
     the emissivity of a specific line as a function of temperature,
     line intensity and more.

   spectrum :
     the emissivity in photons cm^3 bin^-1 s^-1. This requires a
     response be set first

"""

# Adam Foster
#
# Version 0.1, July 17th 2015
# First Release
# Adam Foster

try:
  import astropy.io.fits as pyfits
except ImportError:
  import pyfits

import numpy, os, hashlib, pickle, math, re
# other pyatomdb modules
from . import atomic, util, const, atomdb, apec

import time, wget
import warnings
import bz2
#from scipy.integrate import quad

def __make_spectrum(bins, index, linefile="$ATOMDB/apec_line.fits",\
                  cocofile="$ATOMDB/apec_coco.fits",\
                  binunits='keV', broadening=False, broadenunits='keV', \
                  elements=False, abund=False, dummyfirst=False,\
                  dolines = True, docont=True, dopseudo=True):

  """
  DEPRECATED
  make_spectrum is the most generic "make me a spectrum" routine.

  It returns the emissivity in counts cm^3 s^-1 bin^-1.

  Parameters
  ----------
  bins : array(float)
       The bin edges for the spectrum to be calculated on, in \
       units of keV or Angstroms. Must be monotonically\
       increasing. Spectrum will return len(bins)-1 values.
  index : int
       The index to plot the spectrum from. note that the AtomDB files\
       the emission starts in hdu number 2. So for the first block, you\
       set index=2
  linefile : str
       The file containing all the line emission. Defaults to \
       "$ATOMDB/apec_line.fits"
  cocofile : str
       The file containing all the continuum emission. Defaults to \
       "$ATOMDB/apec_coco.fits"
  binunits : {'keV','A'}
       The energy units for bins. "keV" or "A". Default keV.
  broadening : float
       Line broadening to be applied
  broadenunits : {'keV','A'}
       Units of line broadening "keV" or "A". Default keV.
  elements : iterable of int
       Elements to include, listed by atomic number. if not set, include all.
  abund : iterable of float, length same as elements.
       If set, and array of length (elements) with the abundances of each\
       element relative to the Andres and Grevesse values. Otherwise, assumed to\
       be 1.0 for all elements
  dummyfirst : bool
       If true, add a "0" to the beginning of the return array so it is of the
       same length as bins (can be useful for plotting results)
  dolines : bool
       Calculate line emission (default True)
  docont : bool
       Calculate Continuum emission (default True)
  dopseudo : bool
       Calculate PseudoContinuum (weak line) emission (default True)

  Returns
  -------
  array of floats
      Emissivity in counts cm^3 s^-1 bin^-1.

  """
#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#
#  Version 0.2
#    Added dummyfirst keyword
#    Adam Foster July 21st 2015
#
#  Version 0.3
#    Fixed bug in angstrom spectrum generation
#    Adam Foster February 28th 2018
#
  warnings.warn("make_spectrum is a deprecated function and will be removed. Use CIESession.return_spectrum", DeprecationWarning, stacklevel=3)


  # set up the bins
  if (sum((bins[1:]-bins[:-1])<0) > 0):
    print("*** ERROR: bins must be monotonically increasing. Exiting ***")
    return -1

  if binunits.lower()=='kev':
    ebins = bins*1.0
    flipspectrum=False
  elif binunits.lower() in ['a', 'angstrom', 'angstroms']:
    ebins = const.HC_IN_KEV_A/bins[::-1]
    flipspectrum=True
  else:
    print("*** ERROR: unknown binning unit %s, Must be keV or A. Exiting ***"%\
          (binunits))





  if util.keyword_check(linefile):
    # ok, we should do something with this
    # if it is a string, look for the file name
    if isinstance(linefile, str):
      lfile = os.path.expandvars(linefile)
      if not os.path.isfile(lfile):
        print("*** ERROR: no such file %s. Exiting ***" %(lfile))
        return -1
      ldat = pyfits.open(lfile)
    elif isinstance(linefile, pyfits.hdu.hdulist.HDUList):
      # no need to do anything, file is already open
      ldat = linefile
    else:
      print("Unknown data type for linefile. Please pass a string or an HDUList")
      return -1

  if util.keyword_check(cocofile):
    if isinstance(cocofile, str):
      cfile = os.path.expandvars(cocofile)
      if not os.path.isfile(cfile):
        print("*** ERROR: no such file %s. Exiting ***" %(cfile))
        return -1
      cdat = pyfits.open(cfile)
    elif isinstance(cocofile, pyfits.hdu.hdulist.HDUList):
      # no need to do anything, file is already open
      cdat = cocofile
    else:
      print("Unknown data type for cocofile. Please pass a string or an HDUList")
      return







#  lfile = os.path.expandvars(linefile)
#  cfile = os.path.expandvars(cocofile)
#  if not os.path.isfile(lfile):
#    print "*** ERROR: no such file %s. Exiting ***" %(lfile)
#    return -1
#  if not os.path.isfile(cfile):
#    print "*** ERROR: no such file %s. Exiting ***" %(cfile)
#    return -1

  # open the files
#  ldat = pyfits.open(lfile)
#  cdat = pyfits.open(cfile)

  # get the index
  #if ((index < 2) | (index > len(ldat))):
  #  print "*** ERRROR: Index must be in range %i to %i"%(2, len(ldat)-1)
  #  return -1

  lldat = ldat[index].data
  ccdat = cdat[index].data


  if not util.keyword_check(elements):
    Zl = util.unique(lldat['element'])
    Zc = util.unique(ccdat['Z'])
    Zlist = util.unique(numpy.append(Zl,Zc))

  else:
    Zlist = elements

  if not util.keyword_check(abund):
    abund= numpy.ones(len(Zlist))

  lspectrum = numpy.zeros(len(bins)-1, dtype=float)
  cspectrum = numpy.zeros(len(bins)-1, dtype=float)

  if dolines:
    for iZ, Z in enumerate(Zlist):
      # ADD  LINES
      lspectrum += __add_lines(Z, abund[iZ], lldat, ebins, broadening=broadening, broadenunits=broadenunits)

  if docont | dopseudo:
    for iZ, Z in enumerate(Zlist):
    # ADD  CONTINUUM
      cspectrum += __make_ion_index_continuum(ebins, Z, cocofile=ccdat,\
                                           binunits='keV', no_coco=not(docont),\
                                           no_pseudo=not(dopseudo))*abund[iZ]

  # broaden the continuum if required:
  if broadening:
    cspectrum = __broaden_continuum(ebins, cspectrum, binunits = 'keV', \
                      broadening=broadening,\
                      broadenunits=broadenunits)

  # now flip results back around if angstroms
  if flipspectrum:
    cspectrum = cspectrum[::-1]
    lspectrum = lspectrum[::-1]



  if dummyfirst:
    return numpy.append([0],   cspectrum+lspectrum)
  else:
    return cspectrum+lspectrum



def __make_ion_spectrum(bins, index, Z,z1, linefile="$ATOMDB/apec_nei_line.fits",\
                  cocofile="$ATOMDB/apec_nei_comp.fits",\
                  binunits='keV', broadening=False, broadenunits='keV', \
                  abund=False, dummyfirst=False, nei = True,\
                  dolines = True, docont=True, dopseudo=True):

  """
  DEPRECATED
  make_spectrum is the most generic "make me a spectrum" routine.

  It returns the emissivity in counts cm^3 s^-1 bin^-1.

  Parameters
  ----------
  bins : array(float)
    The bin edges for the spectrum to be calculated on, in \
    units of keV or Angstroms. Must be monotonically\
    increasing. Spectrum will return len(bins)-1 values.
  index : int
    The index to plot the spectrum from. note that the AtomDB files\
    the emission starts in hdu number 2. So for the first block, you\
    set index=2
  Z : int
    Element of spectrum (e.g. 6 for carbon)
  z1 : int
    Ion charge +1 for the spectrum (e.g. 3 for C III)
  linefile : str or HDUList
    The file containing all the line emission. Defaults to \
    "$ATOMDB/apec_line.fits". Can also pass in the opened file, \
    i.e. "linefile = pyatomdb.pyfits.open('apec_nei_line.fits')"
  cocofile : str or HDUList
    The file containing all the continuum emission. Defaults to \
    "$ATOMDB/apec_coco.fits". Can also pass in the opened file, \
    i.e. "cocofile = pyatomdb.pyfits.open('apec_nei_comp.fits')"
  binunits : {'keV','A'}
    The energy units for bins. "keV" or "A". Default keV.
  broadening : float
    Line broadening to be applied
  broadenunits : {'keV','A'}
    Units of line broadening "keV" or "A". Default keV.
  elements : iterable of int
    Elements to include, listed by atomic number. if not set, include all.
  abund : iterable of float, length same as elements.
    If set, and array of length (elements) with the abundances of each\
    element relative to the Andres and Grevesse values. Otherwise, assumed to\
    be 1.0 for all elements
  dummyfirst : bool
    If true, add a "0" to the beginning of the return array so it is of the
    same length as bins (can be useful for plotting results)
  nei : bool
    If set, return the spectrum from the driving ion being Z, rmJ. If not set,
    return the spectrum for the collisional ionization equilibrium *BUT*\ 
    note that the continuum will be wrong, as it is provided for each element
    as a whole.
  dolines : bool
    Calculate line emission (default True)
  docont : bool
    Calculate Continuum emission (default True)
  dopseudo : bool
    Calculate PseudoContinuum (weak line) emission (default True)


  Returns
  -------
  array of floats
      Emissivity in counts cm^3 s^-1 bin^-1.

  """
#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#
#  Version 0.2
#    Added dummyfirst keyword
#    Adam Foster July 21st 2015
#
  warnings.warn("make_ion_spectrum is a deprecated function and will be removed. Use NEISession.return_spectrum", DeprecationWarning, stacklevel=3)


  # set up the bins
  if (sum((bins[1:]-bins[:-1])<0) > 0):
    print("*** ERROR: bins must be monotonically increasing. Exiting ***")
    return -1

  if binunits.lower()=='kev':
    ebins = bins*1.0
  elif binunits.lower() in ['a', 'angstrom', 'angstroms']:
    ebins = const.HC_IN_KEV_A/bins[::-1]
  else:
    print("*** ERROR: unknown binning unit %s, Must be keV or A. Exiting ***"%\
          (binunits))

  # check the files exist
  # first, check if the line file is set
  if util.keyword_check(linefile):
    # ok, we should do something with this
    # if it is a string, look for the file name
    if isinstance(linefile, str):
      if ((linefile == "$ATOMDB/apec_nei_line.fits") & (nei==False)):
        linefile = "$ATOMDB/apec_line.fits"
      lfile = os.path.expandvars(linefile)
      if not os.path.isfile(lfile):
        print("*** ERROR: no such file %s. Exiting ***" %(lfile))
        return -1
      ldat = pyfits.open(lfile)
    elif isinstance(linefile, pyfits.hdu.hdulist.HDUList):
      # no need to do anything, file is already open
      ldat = linefile
    else:
      print("Unknown data type for linefile. Please pass a string or an HDUList")
      return -1

  if util.keyword_check(cocofile):
    if isinstance(cocofile, str):
      if ((cocofile == "$ATOMDB/apec_nei_comp.fits") & (nei==False)):
        cocofile = "$ATOMDB/apec_coco.fits"
      cfile = os.path.expandvars(cocofile)
      if not os.path.isfile(cfile):
        print("*** ERROR: no such file %s. Exiting ***" %(cfile))
        return -1
      cdat = pyfits.open(cfile)
    elif isinstance(cocofile, pyfits.hdu.hdulist.HDUList):
      # no need to do anything, file is already open
      cdat = cocofile
    else:
      print("Unknown data type for cocofile. Please pass a string or an HDUList")
      return




  # get the index
  #if ((index < 2) | (index > len(ldat))):
  #  print "*** ERRROR: Index must be in range %i to %i"%(2, len(ldat)-1)
  #  return -1

  lldat = ldat[index].data
  ccdat = cdat[index].data

  if not abund:
    abund= 1.0

  lspectrum = numpy.zeros(len(bins)-1, dtype=float)
  cspectrum = numpy.zeros(len(bins)-1, dtype=float)
  pspectrum = numpy.zeros(len(bins)-1, dtype=float)

  if dolines:
    # ADD  LINES
    if nei:
      lspectrum += __add_lines(Z, abund, lldat, ebins, broadening=broadening,\
                             broadenunits=broadenunits,z1_drv=z1)
    else:
      lspectrum += __add_lines(Z, abund, lldat, ebins,broadening, broadenunits,z1=z1)

  if docont:
    # ADD  LINES
    if nei:
      cspectrum += __make_ion_index_continuum(ebins, Z, cocofile=ccdat,\
                                         ion = z1, binunits=binunits,\
                                         no_pseudo=True)*abund
    else:
      cspectrum += __make_ion_index_continuum(ebins, Z, cocofile=ccdat,\
                                         ion = 0, binunits=binunits,\
                                         no_pseudo=True)*abund


  if dopseudo:
    # ADD  LINES
    if nei:
      pspectrum += __make_ion_index_continuum(ebins, Z, cocofile=ccdat,\
                                         ion = z1, binunits=binunits, \
                                         no_coco=True)*abund
    else:
      pspectrum += __make_ion_index_continuum(ebins, Z, cocofile=ccdat,\
                                         ion = 0, binunits=binunits, \
                                         no_coco=True)*abund



  # broaden the continuum if required:
  if broadening:

    cspectrum = __broaden_continuum(ebins, cspectrum, binunits = binunits, \
                      broadening=broadening,\
                      broadenunits=broadenunits)
    pspectrum = __broaden_continuum(ebins, pspectrum, binunits = binunits, \
                      broadening=broadening,\
                      broadenunits=broadenunits)
  if dummyfirst:
    return numpy.append([0],   cspectrum+lspectrum+pspectrum)
  else:
    return cspectrum+lspectrum+pspectrum


#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def __add_lines(Z, abund, lldat, ebins, z1=False, z1_drv=False, \
              broadening=False, broadenunits='A'):
  """
  DEPRECATED

  Add lines to spectrum, applying gaussian broadening.

  Add the lines in list lldat, with atomic number Z, to a spectrum delineated
  by ebins (these are the edges, in keV). Apply broadening to the spectrum
  if broadening != False, with units of broadenunits (so can do constant
  wavelength or energy broadening)

  Parameters
  ----------
  Z : int
    Element of interest (e.g. 6 for carbon)
  abund : float
    Abundance of element, relative to AG89 data.
  lldat : dtype linelist
    The linelist to add. Usually the hdu from the apec_line.fits \
    file, often with some filters pre-applied.
  ebins : array of floats
    Energy bins. Will return spectrum with nbins-1 data points.
  z1 : int
    Ion charge +1 of ion to return
  z1_drv : int
    Driving Ion charge +1 of ion to return
  broadening : float
    Apply spectral broadening if > 0. Units of A of keV
  broadenunits : {'A' , 'keV'}
    The units of broadening, Angstroms or keV


  Returns
  -------
  array of float
    broadened emissivity spectrum, in photons cm^3 s^-1 bin^-1. Array has \
    len(ebins)-1 values.
  """
#
#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#


  warnings.warn("add_lines is a deprecated function and will be removed", DeprecationWarning, stacklevel=3)
  lammax = (const.HC_IN_KEV_A/ebins[0])
  lammin = (const.HC_IN_KEV_A/ebins[-1])
  if broadenunits.lower() in ['a','angstrom','angstroms']:
    bunits = 'a'
  elif broadenunits.lower() =='kev':
    bunits = 'kev'
  else:
    print("Error: unknown broadening unit %s, Must be keV or A. Exiting ***"%\
          (broadenunits))
    return -1
  if broadening:
    if bunits == 'a':
      lammax += broadening
      lammin -= broadening
    else:
      lammax += lammax**2 * broadening/const.HC_IN_KEV_A
      lammin -= lammin**2 * broadening/const.HC_IN_KEV_A

  l = lldat[(lldat['Element']==Z) &\
            (lldat['Lambda'] <= lammax) &\
            (lldat['Lambda'] >= lammin)]

  if z1:
    l = l[l['Ion'] ==z1]
  if z1_drv:
    l = l[l['Ion_drv'] ==z1_drv]

  spectrum = numpy.zeros(len(ebins)-1, dtype=float)

  if broadening:
    if  bunits == 'a':
      for ll in l:
        spectrum+=atomdb._addline2(ebins, const.HC_IN_KEV_A/ll['Lambda'], \
                 ll['Epsilon']* abund,\
                 broadening*const.HC_IN_KEV_A/(ll['lambda']**2))
    else:
      for ll in l:
        spectrum+=atomdb._addline2(ebins, const.HC_IN_KEV_A/ll['Lambda'], \
                 ll['Epsilon']* abund,\
                 broadening)
  else:
    for ll in l:
      spectrum[numpy.argmax(\
               numpy.where(ebins <= const.HC_IN_KEV_A/ll['Lambda'])[0])]+=\
               ll['Epsilon']* abund

  return spectrum
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------


def __get_index(te, filename='$ATOMDB/apec_line.fits', \
              teunits='keV', logscale=False):
  """
  Finds HDU with kT closest ro desired kT in given line or coco file.

  Opens the line or coco file, and looks for the header unit
  with temperature closest to te. Use result as index input to make_spectrum

  Parameters
  ----------
  te : float
    Temperature in keV or K
  teunits : {'keV' , 'K'}
    Units of te (kev or K, default keV)
  logscale : bool
    Search on a log scale for nearest temperature if set.
  filename : str or hdulist
    line or continuum file, already opened or filename.

  Returns
  -------
  int
    Index in HDU file with nearest temperature to te.
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#
#
#  Version 0.2 - fixed bug so teunits = l works properly
#    Adam Foster Jan 26th 2016

  if teunits.lower() == 'kev':
    teval = te
  elif teunits.lower() == 'k':
    teval = te*const.KBOLTZ
  else:
    print("*** ERROR: unknown temeprature unit %s. Must be keV or K. Exiting ***"%\
          (teunits))
  if type(filename) == pyfits.hdu.hdulist.HDUList:
    a = filename[1].data
  elif type(filename) == pyfits.hdu.table.BinTableHDU:
    a = filename.data
  elif type(filename)==type('somestring'):
    a = pyfits.open(os.path.expandvars(filename))[1].data


  if logscale:
    i = numpy.argmin(numpy.abs(numpy.log(a['kT'])-numpy.log(teval)))
  else:
    i = numpy.argmin(numpy.abs(a['kT']-teval))
  # need to increase the HDU by 2.
  return i+2



#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def __list_lines(specrange, lldat=False, index=False, linefile=False,\
              units='angstroms', Te=False, teunit='K', minepsilon=1e-20):
  """
  DEPRECATED
  Gets list of the lines in a given spectral range

  Note that the output from this can be passed directly to print_lines


  Parameters
  ----------
  specrange : [float,float]
    spectral range [min,max] to return lines on
  lldat : see notes
    line data
  index : int
    index in lldat, see notes
  linefile : see notes
    line data file, see notes
  units : {'A' , 'keV'}
    units of specrange (default A)
  Te : float
    electron temperature (used if index not set to select appropriate
    data HDU from line file)
  teunit : {'K','keV','eV'}
    units of Te
  minepsilon : float
    minimum epsilon for lines to be returned, in ph cm^3 s^-1

  Notes
  -----
  The actual line list can be defined in one of several ways:

  specrange = [10,100]

  1. lldat as an actual list of lines::

       a = pyfits.open('apec_line.fits')
       llist = a[30].data
       l = list_lines(specrange, lldat=llist)

  2. lldat as a numpy array of lines::

       a = pyfits.open('apec_line.fits')
       llist = numpy.array(a[30].data)
       l = list_lines(specrange, lldat=llist)

  3. lldat is a BinTableHDU from pyfits::

       a = pyfits.open('apec_line.fits')
       llist = numpy.array(a[30])
       l = list_lines(specrange, lldat=llist)

  4. lldat is a HDUList from pyfits. In this case index must also be set::

       a = pyfits.open('apec_line.fits')
       index = 30
       l = list_lines(specrange, lldat=a, index=index)

  5. lldat NOT set, linefile contains apec_line.fits file location, index
     identifies the HDU::

       linefile = 'mydir/apec_v2.0.2_line.fits'
       index = 30
       l = list_lines(specrange, linefile=linefile, index=index)

  6. lldat NOT set & linefile NOT set, linefile is set to
     $ATOMDB/apec_line.fits. index identifies the HDU::

       index = 30
       l = list_lines(specrange, index=index)

  Returns
  -------
  linelist : dtype=([('Lambda', '>f4'), \
           ('Lambda_Err', '>f4'), \
           ('Epsilon', '>f4'), \
           ('Epsilon_Err', '>f4'), \
           ('Element', '>i4'), \
           ('Ion', '>i4'), \
           ('UpperLev', '>i4'), \
           ('LowerLev', '>i4')])
     A line list filtered by the various elements.
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#
#  Version 0.2 - bugfix:
#    corrected lldat test to be if not false, instead of if true.
#    Adam Foster September 16th 2015
#

  # check the units

  warnings.warn("list_lines is a deprecated function and will be removed. Use CIESession.return_linelist", DeprecationWarning, stacklevel=3)


  if units.lower()=='kev':
    specrange = [const.HC_IN_KEV_A/specrange[1], const.HC_IN_KEV_A/specrange[0]]
  elif units.lower() in ['a', 'angstrom', 'angstroms']:
    specrange = specrange
  else:
    print("*** ERROR: unknown unit %s, Must be keV or A. Exiting ***"%\
          (units))

  # open the line file if only specified by a name
  if lldat==False:
    # use default file if not specified
    if linefile==False:
      linefile=os.path.expandvars("$ATOMDB/apec_line.fits")
    # open file
    lldat = pyfits.open(linefile)

  # convert temperature units

  if Te != False:
    if teunit.lower() == 'kev':
      kT = Te*1.0
    elif teunit.lower() == 'ev':
      kT = Te/1000.0
    elif teunit.lower() == 'k':
      kT = Te*const.KBOLTZ
    else:
      print("*** ERROR: unknown teunit %s, Must be keV or K. Exiting ***"%\
          (teunit))

  # if the temperature is specified, get the index
  if index != False:

    if Te != False:
      print("Warning: both index and Te specified. Using index")
    else:
      # everything is fine!
      pass

  else:
    if Te != False:
      # open the file and get the index of nearest block
      index =  __get_index(kT, filename=lldat, \
              teunits='keV', logscale=True)
    else:
      if not type(lldat) in [pyfits.fitsrec.FITS_rec, numpy.ndarray]:
        print("Error: did not specify index or Te")
        return False
    # index is specified, so we'll use it.
    pass




  if lldat != False:
    #options here:
    # (1) This is a line list, i.e. the ldata[index].data from a file,
    #     either in original pyfits format or a numpy array
    #
    # (2) This is an hdu from a file
    #
    # (3) This is a _line.fits file, and requires an index to make sense of it

    if type(lldat) == pyfits.hdu.hdulist.HDUList:
      if not(index):
        print("*** ERROR. lldat provided as HDUList, but no index specified.")
        print(" Exiting")
      llist = numpy.array(lldat[index].data)
    elif type(lldat) == pyfits.hdu.table.BinTableHDU:
      llist = numpy.array(lldat.data)
    elif type(lldat) in [pyfits.fitsrec.FITS_rec, numpy.ndarray]:
      llist = numpy.array(lldat)
    else:
      print("ERROR: unkonwn llist type!")
  else:
    # there should now be no way to get here. commenting out this section
    print("ERROR: I SHOULD NEVER BE HERE")
    pass
    # no line data supplied.
    #if linefile==False:
      #linefile = os.path.expandvars('$ATOMDB/apec_line.fits')
    #if not os.path.isfile(linefile):
      #print "*** ERROR. No linefile supplied but $ATOMDB/apec_line.fits is"
      #print " not a file. Exiting"
    #else:

      #if not index:
        #print "*** ERROR. No index specified for line list. Exiting"
      #else:
        #lfile = pyfits.open(os.path.expandvars(linefile))
        #llist= numpy.array(lfile[index].data)

  # at this point, we have data. Filter by specrange, minepsilon
  llist = llist[(llist['Lambda']>= specrange[0]) &\
                (llist['Lambda']<= specrange[1]) &\
                (llist['Epsilon']>= minepsilon)]



  return llist


#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def __list_nei_lines(specrange, Te, tau, Te_init=False,  lldat=False, linefile=False,\
              units='angstroms', teunit='K', minepsilon=1e-20, \
              datacache=False):
  """
  DEPRECATED
  Gets list of the lines in a given spectral range for a given NEI plasma

  For speed purposes, this takes the nearest temperature tabulated in the
  linefile, and applies the exact ionization balance as calculated to this.
  This is not perfect, but should be good enough.

  Note that the output from this can be passed directly to print_lines


  Parameters
  ----------
  specrange : [float,float]
    spectral range [min,max] to return lines on
  Te : float
    electron temperature
  tau : float
    electron density * time (cm^-3 s)
  Te_init : float
    initial ionization balance temperature
  lldat : see notes
    line data
  linefile : see notes
    line data file, see notes
  units : {'A' , 'keV'}
    units of specrange (default A)
  teunit : {'K' , 'keV'}
    units of temperatures (default K)
  minepsilon : float
    minimum emissivity (ph cm^3 s^{-1}) for inclusion in linelist

  Notes
  -----
  The actual line list can be defined in one of several ways:

  specrange = [10,100]

  1. lldat as an actual list of lines::

       a = pyfits.open('apec_nei_line.fits')
       llist = a[30].data
       l = list_nei_lines(specrange, lldat=llist)

  2. lldat as a numpy array of lines::

       a = pyfits.open('apec_nei_line.fits')
       llist = numpy.array(a[30].data)
       l = list_nei_lines(specrange, lldat=llist)

  3. lldat is a BinTableHDU from pyfits::

       a = pyfits.open('apec_nei_line.fits')
       llist = numpy.array(a[30])
       l = list_nei_lines(specrange, lldat=llist)

  4. lldat is a HDUList from pyfits. In this case index must also be set::

       a = pyfits.open('apec_nei_line.fits')
       index = 30
       l = list_nei_lines(specrange, lldat=a, index=index)

  5. lldat NOT set, linefile contains apec_line.fits file location, index
     identifies the HDU::

       linefile = 'mydir/apec_v3.0.2_nei_line.fits'
       index = 30
       l = list_nei_lines(specrange, linefile=linefile, index=index)

  6. lldat NOT set & linefile NOT set, linefile is set to
     $ATOMDB/apec_line.fits. index identifies the HDU::

       index = 30
       l = list_nei_lines(specrange, Te, tau)

  Returns
  -------
  linelist : dtype=([('Lambda', '>f4'), \
           ('Lambda_Err', '>f4'), \
           ('Epsilon', '>f4'), \
           ('Epsilon_Err', '>f4'), \
           ('Element', '>i4'), \
           ('Elem_drv', '>i4'), \
           ('Ion', '>i4'), \
           ('Ion_drv', '>i4'), \
           ('UpperLev', '>i4'), \
           ('LowerLev', '>i4')])
     A line list filtered by the various elements.
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster November 02nd 2015
#

  # check the units
  warnings.warn("list_nei_lines is a deprecated function and will be removed. Use NEISession.return_linelist", DeprecationWarning, stacklevel=3)

  if units.lower()=='kev':
    specrange = [const.HC_IN_KEV_A/specrange[1], const.HC_IN_KEV_A/specrange[0]]
  elif units.lower() in ['a', 'angstrom', 'angstroms']:
    specrange = specrange
  else:
    print("*** ERROR: unknown unit %s, Must be keV or A. Exiting ***"%\
          (units))

  # convert Te into keV

  if teunit.lower() == 'kev':
    kT = Te*1.0
  elif teunit.lower() == 'ev':
    kT = Te/1000.0
  elif teunit.lower() == 'k':
    kT = Te*const.KBOLTZ
  else:
    print("*** ERROR: unknown teunit %s, Must be keV or K. Exiting ***"%\
          (teunit))


  if Te_init != False:
    if teunit.lower() == 'kev':
      kT_init = Te_init*1.0
    elif teunit.lower() == 'ev':
      kT_init = Te_init/1000.0
    elif teunit.lower() == 'k':
      kT_init = Te_init*const.KBOLTZ
    else:
      print("*** ERROR: unknown teunit %s, Must be keV or K. Exiting ***"%\
          (teunit))
  else:
  # Te_init was not set:
    kT_init = 1e4*const.KBOLTZ



  # sort out the line file...

  if lldat != False:
    #options here:
    # (1) This is a line list, i.e. the ldata[index].data from a file,
    #     either in original pyfits format or a numpy array
    #
    # (2) This is an hdu from a file
    #
    # (3) This is a _line.fits file, and requires an index to make sense of it

    if type(lldat) == pyfits.hdu.hdulist.HDUList:
      # go get the index
      te_index = __get_index(kT, filename=lldat, \
              teunits='keV', logscale=True)
      llist = numpy.array(lldat[te_index].data)
    elif type(lldat) == pyfits.hdu.table.BinTableHDU:
      # no need to get index
      llist = numpy.array(lldat.data)
    elif type(lldat) in [pyfits.fitsrec.FITS_rec, numpy.ndarray]:
      llist = numpy.array(lldat)
  else:
    # no line data supplied.
    if linefile==False:
      linefile = os.path.expandvars('$ATOMDB/apec_nei_line.fits')
    if not os.path.isfile(linefile):
      print("*** ERROR. Linefile %s is "%(linefile), end=' ')
      print(" not a file. Exiting")
    else:
      lldat = pyfits.open(os.path.expandvars(linefile))
      te_index = __get_index(kT, filename=lldat, \
              teunits='keV', logscale=True)
      llist= numpy.array(lldat[te_index].data)

  # get filtered line list

  llist = llist[(llist['Lambda']>= specrange[0]) &\
                (llist['Lambda']<= specrange[1]) &\
                (llist['Epsilon'] >= minepsilon)]

  # get the index

  # get list of all the elements present
  Zlist = util.unique(llist['Element'])
  # Calculate the ionization balance.
  ionbal ={}
  for Z in Zlist:
    ionbal[Z] = apec.return_ionbal(Z, kT, tau=tau, Te_init = kT_init,\
                                    teunit='keV', datacache=datacache)
  # multiply everything by the appropriate ionization fraction
  if 'Elem_drv' in llist.dtype.names:

    for il in llist:
      il['Epsilon'] *= ionbal[il['Elem_drv']][il['Ion_drv']-1]
  else:
    for il in llist:
      il['Epsilon'] *= ionbal[il['Element_drv']][il['Ion_drv']-1]

  # filter again based on new epsilon values
  llist=llist[llist['Epsilon']>minepsilon]
  # at this point, we have data
  return llist
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------

def __print_lines(llist, specunits = 'A', do_cfg=False):
  """
  DEPRECATED
  Prints lines in a linelist to screen

  This routine is very primitive as things stand. Plenty of room for refinement.

  Parameters
  ----------
  llist: dtype(linelist)
    list of lines to print. Typically returned by list_lines.
  specunits: {'A' , 'keV'}
    units to list the line positions by (A or keV, default A)
  do_cfg: bool
    Show full configuration information for each level

  Returns
  -------
  Nothing, though prints data to standard out.
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015
#

  warnings.warn("print_lines is a deprecated function and will be removed.", DeprecationWarning, stacklevel=3)

  if specunits.lower()=='kev':
    specunits = 'keV'
    llist['Lambda']=const.HC_IN_KEV_A/llist['Lambda']
  elif specunits.lower() in ['a', 'angstrom', 'angstroms']:
    specunits = 'A'
  else:
    print("*** ERROR: unknown unit %s, Must be keV or A. Exiting ***"%\
          (specunits))


  # now print the header lines

  if specunits == 'keV':
    if 'Ion_drv' in llist.dtype.names:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('Energy','Epsilon','Element','Ion','Ion_drv','UpperLev','LowerLev')
    else:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('Energy','Epsilon','Element','Ion','UpperLev','LowerLev')
    print(s)

    if 'Ion_drv' in llist.dtype.names:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('keV','ph cm3 s-1','','','','','')
    else:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('keV','ph cm3 s-1','','','','')
    print(s)

  else:
    if 'Ion_drv' in llist.dtype.names:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('Lambda','Epsilon','Element','Ion','Ion_drv','UpperLev','LowerLev')
    else:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('Lambda','Epsilon','Element','Ion','UpperLev','LowerLev')
    print(s)

    if 'Ion_drv' in llist.dtype.names:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('A','ph cm3 s-1','','','','','')
    else:
      s= "%-10s %-10s %-10s %-10s %-10s %-10s" %\
         ('A','ph cm3 s-1','','','','')
    print(s)

  # now print the data
  d={}
  if 'Ion_drv' in llist.dtype.names:

    for il in llist:
      s = "%.4e %.4e %-10i %-10i %-10i %-10i %-10i" %\
         (il['Lambda'],\
          il['Epsilon'],\
          il['Element'],\
          il['Ion'],\
          il['Ion_drv'],\
          il['UpperLev'],\
          il['LowerLev'])

      # get the configuration info
      if do_cfg:
        lvdat = atomdb.get_data(il['Element'],il['Ion'],'LV',\
                                         datacache=d)

        s+= " %40s %3f %2i %2i"%(lvdat[1].data['ELEC_CONFIG'][il['UpperLev']-1],\
                                 lvdat[1].data['S_QUAN'][il['UpperLev']-1],\
                                 lvdat[1].data['L_QUAN'][il['UpperLev']-1],\
                                 lvdat[1].data['LEV_DEG'][il['UpperLev']-1])

        s+= " -> %40s %3f %2i %2i"%(lvdat[1].data['ELEC_CONFIG'][il['LowerLev']-1],\
                                 lvdat[1].data['S_QUAN'][il['LowerLev']-1],\
                                 lvdat[1].data['L_QUAN'][il['LowerLev']-1],\
                                 lvdat[1].data['LEV_DEG'][il['LowerLev']-1])
      else:
        pass
      print(s)
  else:
    for il in llist:
      s = "%.4e %.4e %-10i %-10i %-10i %-10i" %\
         (il['Lambda'],\
          il['Epsilon'],\
          il['Element'],\
          il['Ion'],\
          il['UpperLev'],\
          il['LowerLev'])
      if do_cfg:
        lvdat = atomdb.get_data(il['Element'],il['Ion'],'LV',\
                                         datacache=d)

        s+= " %40s %3f %2i %2i"%(lvdat[1].data['ELEC_CONFIG'][il['UpperLev']-1],\
                                 lvdat[1].data['S_QUAN'][il['UpperLev']-1],\
                                 lvdat[1].data['L_QUAN'][il['UpperLev']-1],\
                                 lvdat[1].data['LEV_DEG'][il['UpperLev']-1])

        s+= " -> %40s %3f %2i %2i"%(lvdat[1].data['ELEC_CONFIG'][il['LowerLev']-1],\
                                 lvdat[1].data['S_QUAN'][il['LowerLev']-1],\
                                 lvdat[1].data['L_QUAN'][il['LowerLev']-1],\
                                 lvdat[1].data['LEV_DEG'][il['LowerLev']-1])
      else:
        pass
      print(s)






#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------

def __make_ion_index_continuum(bins,  element, \
                             index = False,\
                             cocofile='$ATOMDB/apec_coco.fits',\
                             binunits = 'keV', \
                             fluxunits='ph', no_coco=False, no_pseudo=False, \
                             ion=0,\
                             broadening=False,\
                             broadenunits='keV'):
  """
  DEPRECATED
  Creates the continuum for a given ion.

  Parameters
  ----------
  bins : array(float)
    The bin edges for the spectrum to be calculated on, in units of keV \
    or Angstroms. Must be monotonically increasing. Spectrum will return \
    len(bins)-1 values.
  element : int
    Atomic number of element to make spectrum of (e.g. 6 for carbon)
  binunits : {'keV' , 'A'}
    The energy units for bins. "keV" or "A". Default keV.
  fluxunits : {'ph' , 'erg'}
    Whether to return the emissivity in photons ('ph') or ergs ('erg').\
    Defaults to  photons
  no_coco : bool
    If true, do not include the compressed continuum
  no_pseudo : bool
    If true, do not include the pseudo continuum (weak lines)
  ion : int
    Ion to calculate, e.g. 4 for C IV. By default, 0 (whole element).
  index : int
    The index to generate the spectrum from. Note that the AtomDB files
    the emission starts in hdu number 2. So for the first block, you
    set index=2. Only required if cocofile is a filename or an HDULIST
  cocofile : HDUList, HDU or str
    The continuum file, either already open (HDULIST) or filename.
    alternatively, provide the HDU itself, and then do not need
    to define the index
  broadening: float
    Broaden the continuum by gaussians of this width (if False,
    no broadening is applied)
  broadenunits: {'keV' , 'A'}
    Units for broadening (kev or A)

  Returns
  -------
  array(float)
    len(bins)-1 array of continuum emission, in units of \
    photons cm^3 s^-1 bin^-1 (fluxunits = 'ph') or
    ergs cm^3 s^-1 bin^-1 (fluxunits = 'erg')

  """
#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015

  warnings.warn("make_ion_index_continuum is a deprecated function and will be removed. Use NEISession.return_spectrum", DeprecationWarning, stacklevel=3)

  if binunits.lower() in ['kev']:
    angstrom = False
  elif binunits.lower() in ['a','angstrom','angstroms']:
    angstrom = True
  else:
    print("*** ERROR: unknown units %s for continuum spectrum. Exiting" %\
          (binunits))

  if fluxunits.lower() in ['ph', 'photon','photons', 'p']:
    ergs = False
  elif fluxunits.lower() in ['erg','ergs']:
    ergs = True
  else:
    print("*** ERROR: unknown units %s for continuum flux. Exiting" %\
          (fluxunits))

    return -1

  if angstrom:
    # need to work in keV for this bit
    bins = const.HC_IN_KEV_A/bins[::-1]

  # open the data files

  if type(cocofile) == pyfits.hdu.hdulist.HDUList:
    cdat = cocofile[index].data
  elif type(cocofile) == pyfits.hdu.table.BinTableHDU:
    cdat = cocofile.data
  elif type(cocofile) == pyfits.fitsrec.FITS_rec:
    cdat = cocofile
  elif type(cocofile) == type('somestring'):
    cdat = pyfits.open(os.path.expandvars(cocofile))[index].data
  else:
    print("*** ERROR: unable to parse cocofile = %s" %repr(cocofile))
    return -1


  nbins = len(bins)-1
  spectrum = numpy.zeros(nbins, dtype=float)
  Z = element
  rmj = ion

  d = cdat[(cdat['Z']==Z) & (cdat['rmJ'] == rmj)]
  if len(d)==0:
    return spectrum
  else:
    d = d[0]
  if not(no_coco):
    spectrum += _expand_E_grid(bins,\
                                d['N_cont'],\
                                d['E_cont'],\
                                d['Continuum'])

  if not(no_pseudo):
    spectrum += _expand_E_grid(bins,\
                                d['N_Pseudo'],\
                                d['E_Pseudo'],\
                                d['Pseudo'])


  if (ergs):
    spectrum *= const.ERGperKEV*0.5*(bins[:-1]+bins[1:])


  if (angstrom):
    spectrum = spectrum[::-1]  # reverse spectrum to match angstroms

  return spectrum

#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def _expand_E_grid(eedges, n,Econt_in_full, cont_in_full):

  """
  Code to expand the compressed continuum onto a series of bins.

  Parameters
  ----------
  eedges : float(array)
    The bin edges for the spectrum to be calculated on, in units of keV
  n : int
    The number of good data points in the continuum array
  Econt_in_full: float(array)
    The compressed continuum energies
  cont_in_full: float(array)
    The compressed continuum emissivities

  Returns
  -------
  float(array)
    len(bins)-1 array of continuum emission, in units of \
    photons cm^3 s^-1 bin^-1
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015

  import scipy.integrate
  cont_in = cont_in_full[:n]
  Econt_in = Econt_in_full[:n]


  # ok. So. Sort this.
  E_all = numpy.append(Econt_in, eedges)

  cont_tmp = numpy.interp(eedges, Econt_in, cont_in)
  C_all = numpy.append(cont_in, cont_tmp)

  iord = numpy.argsort(E_all, kind="mergesort")

  # order the arrays
  E_all = E_all[iord]
  C_all = C_all[iord]

  ihi = numpy.where(iord>=n)[0]
  cum_cont = scipy.integrate.cumulative_trapezoid(C_all, E_all, initial=0)
  C_out = numpy.zeros(len(eedges))
#  for i in range(len(eedges)):
#    arg  = numpy.argwhere(iord==n+i)[0][0]
#    C_out[i] = cum_cont[ihi[i]]

  C_out = cum_cont[ihi]

  cont = C_out[1:]-C_out[:-1]
  return cont



#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def __broaden_continuum(bins, spectrum, binunits = 'keV', \
                      broadening=False,\
                      broadenunits='keV'):
  """
  DEPRECATED

  Apply a broadening to the continuum

  Parameters
  ----------
  bins : array(float)
    The bin edges for the spectrum to be calculated on, in units of \
    keV or Angstroms. Must be monotonically increasing. Spectrum \
    will return len(bins)-1 values.
  spectrum : array(float)
    The emissivities in each bin in the unbroadened spectrum
  binunits : {'keV' , 'A'}
    The energy units for bins. "keV" or "A". Default keV.
  broadening : float
    Broaden the continuum by gaussians of this width (if False,\
    no broadening is applied)
  broadenunits : {'keV' , 'A'}
    Units for broadening (kev or A)

  Returns
  -------
  array(float)
    spectrum broadened by gaussians of width broadening
  """

#  History
#  -------
#  Version 0.1 - initial release
#    Adam Foster July 17th 2015

  # convert to energy grid
  warnings.warn("broaden_continuum is a deprecated function and will be removed", DeprecationWarning, stacklevel=3)

  if binunits.lower() in ['kev']:
    angstrom = False
  elif binunits.lower() in ['a','angstrom','angstroms']:
    angstrom = True
  else:
    print("*** ERROR: unknown units %s for continuum spectrum. Exiting" %\
          (binunits))

  if angstrom:
    bins = const.HC_IN_KEV_A/bins[::-1]

  # broadening
  if broadening:
    if broadenunits.lower() in ['a','angstrom','angstroms']:
      bunits = 'a'
    elif broadenunits.lower() in ['kev']:
      bunits = 'kev'
    else:
      print("*** ERROR: unknown units %s for continuum broadening. Exiting" %\
            (broadenunits))
      return -1
    # do the broadening
    spec = numpy.zeros(len(spectrum))
    emid = (bins[1:]+bins[:-1])/2
    if broadenunits == 'a':
      # convert to keV
      broadenvec = emid**2 *broadening/const.HC_IN_KEV_A
    else:
      broadenvec = numpy.zeros(len(emid))
      broadenvec[:] = broadening
    for i in range(len(spec)):

      spec += atomdb.addline2(bins, emid[i], \
                 spectrum[i],\
                 broadenvec[i])
    spectrum=spec
  if angstrom:
    spectrum=spectrum[::-1]
  return spectrum

def _get_response_ebins(rmf):
  """
  Get the energy bins from the rmf file

  Parameters
  ----------
  rmf : string or pyfits.hdu.hdulist.HDUList
    The filename of the rmf or the opened rmf file

  Returns
  -------
  specbins_in : array(float)
    input energy bins used. nbins+1 length, with the last item being the final bin
    This is the array on which the input spectrum should be calculated
  specbins_out : array(float)
    output energy bins used. nbins+1 length, with the last item being the final bin
    This is the array on which the output spectrum will be returned. Often (but not
    always) the same as specbins_in
  """
  #
  # Update 2016-05-25
  #
  # Changed return to be the MATRIX, not EBOUNDS
  # In many instruments these are the same grids, but not all.


  if type(rmf)==str:
    rmfdat = pyfits.open(rmf)
  elif type(rmf) == pyfits.hdu.hdulist.HDUList:
    rmfdat = rmf
  else:
    print("ERROR: unknown rmf type, %s"%(repr(type(rmf))))
    return
  try:
    k=rmfdat.index_of('MATRIX')
    matrixname = 'MATRIX'
  except KeyError:
    try:
      k=rmfdat.index_of('SPECRESP MATRIX')
      matrixname = 'SPECRESP MATRIX'
    except KeyError:
      print("Cannot find index for matrix in this data")
      raise


  specbins_in = rmfdat[matrixname].data['ENERG_LO']
  specbins_in = numpy.append(specbins_in, rmfdat[matrixname].data['ENERG_HI'][-1])


  specbins_out = rmfdat['EBOUNDS'].data['E_MIN']
  specbins_out = numpy.append(specbins_out , rmfdat['EBOUNDS'].data['E_MAX'][-1])


  return specbins_in, specbins_out


def __get_effective_area(rmf, arf=False):
  """
  Get the effective area of a response file

  Parameters
  ----------
  rmf : string or pyfits.hdu.hdulist.HDUList
    The filename of the rmf or the opened rmf file
  arf : string or pyfits.hdu.hdulist.HDUList
    The filename of the arf or the opened arf file
  Returns
  -------
  array(float)
    energy grid (keV) for returned response
  array(float)
    effective area for the returned response
  """
#
# Update 2016-05-25
#
# Changed to return the energy grid and the spectrum, as apparently in some
# instruments these are not the same as the input energy grid.

  if arf:
    if type(arf)==str:
      arfdat = pyfits.open(arf)
    elif type(arf) == pyfits.hdu.hdulist.HDUList:
      arfdat = arf
    else:
      print("ERROR: unknown arf type, %s"%(repr(type(arf))))
      return
    arfarea = arfdat['SPECRESP'].data['SPECRESP']
  else:
    arfarea = 1.0


  if type(rmf)==str:
    rmfdat = pyfits.open(rmf)
  elif type(rmf) == pyfits.hdu.hdulist.HDUList:
    rmfdat = rmf
  else:
    print("ERROR: unknown rmf type, %s"%(repr(type(rmf))))
    return

  ebins_in, ebins_out = _get_response_ebins(rmf)

  area = numpy.zeros(len(ebins_in)-1, dtype=float)


  try:
    k=rmfdat.index_of('MATRIX')
    matrixname = 'MATRIX'
  except KeyError:
    try:
      k=rmfdat.index_of('SPECRESP MATRIX')
      matrixname = 'SPECRESP MATRIX'
    except KeyError:
      print("Cannot find index for matrix in this data")
      raise

  matname = 'MATRIX'
  if not matname in rmfdat[matrixname].data.names:
    matname ='SPECRESP MATRIX'
    if not matname in rmfdat[matrixname].data.names:
      print("Error: Cannot find Matrix in rmf data")
      return
  for ibin, i in enumerate(rmfdat[matrixname].data):
    area[ibin] = sum(i[matname])

  area *= arfarea
  return ebins_in, area

class _Gaussian_CDF():
  """
  For fast interpolation, pre-calculate the CDF and interpolate it when
  broadening lines

  Parameters
  ----------
  None

  Examples
  --------

  Create a CDF instance:

  >>> s=_Gaussian_CDF()

  Broaden a line on ebins grid, with centroid and width.

  >>> cdf = a.broaden(centroid, width, ebins)

  Convert to flux in each bin

  >>> flux= cdf[1:]-cdf[:-1]


  """
  def __init__(self):
    from scipy.stats import norm
    self.x = numpy.linspace(-6,6,2400)
    self.cdf = norm.cdf(self.x)
    self.broadentype='Gaussian'

  def broaden(self, centroid, width, ebins):
    """
    Broaden a line, return CDF

    Parameters
    ----------
    centroid : float
      The line energy (keV)
    width : float
      The sigma of the normal distribution, in keV
    ebins : array(float)
      Energy grid to return CDF on

    Returns
    -------
    CDF : array(float)
      cumulative flux distribution of linen at each bin edge.
    """

    # move the energy grid
    etmp = (ebins-centroid)/width



    # interpolate to get the appropriate CDF values
    ret=numpy.interp(etmp, self.x, self.cdf)


    return ret


  def broaden_factor(self, centroid, width, ebins):


    etmp = (ebins-centroid)/width

    return etmp




class _Lorentzian_CDF():
  """
  For fast interpolation, pre-calculate the CDF and interpolate it when
  broadening lines

  Parameters
  ----------
  None

  Examples
  --------

  Create a CDF instance:

  >>> s=_Gaussian_CDF()

  Broaden a line on ebins grid, with centroid and width.

  >>> cdf = a.broaden(centroid, width, ebins)

  Convert to flux in each bin

  >>> flux= cdf[1:]-cdf[:-1]


  """



  def __init__(self):
    from scipy.stats import cauchy
    self.x = numpy.linspace(-12,12,4800)
    self.cdf = cauchy.cdf(self.x)
    self.broadentype='Lorentzian'

  def broaden(self, centroid, width, ebins):
    """
    Broaden a line, return CDF

    Parameters
    ----------
    centroid : float
      The line energy (keV)
    width : float
      The sigma of the normal distribution, in keV
    ebins : array(float)
      Energy grid to return CDF on

    Returns
    -------
    CDF : array(float)
      cumulative flux distribution of linen at each bin edge.
    """

    # move the energy grid
    etmp = (ebins-centroid)/width

    # interpolate to get the appropriate CDF values
    ret=numpy.interp(etmp, self.x, self.cdf)

    return ret



class _Voigt_CDF():
  """
  For fast interpolation, pre-calculate the CDF and interpolate it when
  broadening lines

  Parameters
  ----------
  None

  Examples
  --------

  Create a CDF instance:

  >>> s=_Gaussian_CDF()

  Broaden a line on ebins grid, with centroid and width.

  >>> cdf = a.broaden(centroid, width, ebins)

  Convert to flux in each bin

  >>> flux= cdf[1:]-cdf[:-1]


  """



  def __init__(self, sigma, gamma):
    from scipy.special import voigt_profile
    self.x = numpy.linspace(-12,12,2400)
    self.broadentype='Voigt'



  def __recalc(self, sigma, gamma):

    if ((self.sigma==sigma) & \
        (self.gamma == gamma)): pass



    edges = (self.x[1:]+self.x[:-1])/2
    edges = numpy.append( edges[0]-(edges[1]-edges[0]), \
                          edges, \
                          edges[-1] + (edges[-1]-edges[-2]))


    dx = edges[1:]-edges[:-1]

    pdfmid =  voigt_profile(self.x, sigma, gamma)*dx

    self.cdf = numpy.cumsum(pdfmid)


  def broaden(self, centroid, ebins, sigma, gamma, test_recalc=False):
    """
    Broaden a line, return CDF

    Parameters
    ----------
    centroid : float
      The line energy (keV)
    width : float
      The sigma of the normal distribution, in keV
    ebins : array(float)
      Energy grid to return CDF on

    Returns
    -------
    CDF : array(float)
      cumulative flux distribution of linen at each bin edge.
    """

    # move the energy grid
    etmp = (ebins-centroid)

    # If required, recalculate the parameters
    if test_recalc:
      self.__recalc(sigma, gamma)

    # interpolate to get the appropriate CDF values
    ret=numpy.interp(etmp, self.x, self.cdf)

    return ret


[docs] class CIESession(): """ Load and generate a collisional ionization equilibrium spectrum Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Default AG89. Attributes ---------- fart noti : dict datacache : dict Any Atomdb FITS files which have to be opened are stored here spectra : CIESpectra Object storing the actual spectral data elements : iterable of int Elements to include, listed by atomic number. if not set, include all. default_abundset : string The abundance set used for the original emissivity file calculation abundset : string The abundance set to be used for the returned spectrum abundsetvector : array_like(float) The relative abundance between default_abundset and abundset for each element response_set : bool Have we loaded a response (or set a dummy response) dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) broaden_limit : float Apply broadening to lines with epsilon > this value (ph cm3 s-1) thermal_broadening : bool Apply thermal broadening to lines (default = False) velocity_broadening : float Apply velocity broadening with this velocity (km/s). If <=0, do not apply. Examples -------- Create a session instance: >>> s=CIESession() Set up the responses, in this case a dummy response from 0.1 to 10 keV, with area 1cm^2 in each bin >>> ebins = numpy.linspace(0.1,10,1000) >>> s.set_response(ebins, raw=True) (Alternatively, for a real response file, s.set_response(rmffile, arf=arffile) Turn on thermal broadening >>> s.set_broadening(True) Will thermally broaden lines with emissivity > 1.000000e-20 ph cm3 s-1 Return spectrum at 1.0keV >>> spec = s.return_spectrum(1.0) spec is in photons cm^5 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) Turn off continuum and pseudocontinuum (by default these are on) >>> s.dopseudo = False >>> s.docont = False >>> s.doeebrems = False >>> \# s.dolines = False would turn off lines >>> spec = s.return_spectrum(1.0) spec is in photons cm^5 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) """ def __init__(self, linefile="$ATOMDB/apec_line.fits",\ cocofile="$ATOMDB/apec_coco.fits",\ elements=False,\ abundset='AG89'): """ Initialization routine. Can set the line and continuum files here Parameters ----- linefile : str or HDUList The filename of the line emissivity data, or the file opened with pyfits. cocofile : str or HDUList The filename of the continuum emissivity data, or the file opened with pyfits. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. Returns ------- None """ self.SessionType='CIE' self._session_initialise1(linefile, cocofile, elements, abundset) # a hold for the spectra self.spectra=_CIESpectrum(self.linedata, self.cocodata) self._session_initialise2() def _session_initialise1(self, linefile, cocofile, elements, abundset): """ This routine does all the initialization which is the same for all the different session classes, separates them out from the instance specific ones Parameters ---------- linefile : str or HDUList The filename of the line emissivity data, or the file opened with pyfits. cocofile : str or HDUList The filename of the continuum emissivity data, or the file opened with pyfits. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. Returns ------- None """ self.datacache={} # Open up the APEC files self._set_apec_files(linefile, cocofile) # if elements are specified, use them. Otherwise, use Z=1-30 if util.keyword_check(elements): self.elements = elements else: if self.SessionType=='CIE': self.elements=list(range(1,const.MAXZ_CIE+1)) elif self.SessionType in ['NEI','PShock','Kappa']: self.elements=list(range(1,const.MAXZ_NEI+1)) # Set both the current and the default abundances to those that # the apec data was calculated on self.abundset=self.linedata[0].header['SABUND_SOURCE'] self.default_abundset=self.linedata[0].header['SABUND_SOURCE'] self.abundsetvector = numpy.zeros(const.MAXZ_CIE+1) for Z in self.elements: self.abundsetvector[Z] = 1.0 # but if another vector was already specified, use this instead if util.keyword_check(abundset): self.set_abundset(abundset) self.abund = numpy.zeros(const.MAXZ_CIE+1) for Z in self.elements: self.abund[Z]=1.0 # Set a range of parameters which can be overwritten later self.response_set = False # have we loaded a response file? self.dolines=True # Include lines in spectrum self.docont=True # Include continuum in spectrum self.dopseudo=True # Include pseudo continuum in spectrum self.do_eebrems = True # Do electron-electron bremsstrahlung # no response set yet self.rmffile=False self.arffile=False self.raw_response=False # verbosity self.verbose = False def _session_initialise2(self): """ This routine does the remaining initialization which is the same for all the different session classes, separates them out from the instance specific ones, after the Spectrum class has been generated Parameters ---------- None Return ------ None """ self.set_broadening(False, broaden_limit=1e-20) self.cdf = _Gaussian_CDF() # def ionfraction(self, T, Z, teunit='K'): # # settings=apec.parse_par_file(os.path.expandvars('$ATOMDB/apec.par')) # ionfrac = atomdb._get_precalc_ionfrac(os.path.expandvars(settings['IonBalanceTable']), Z , T) # # return ionfrac
[docs] def set_broadening(self, thermal_broadening, broaden_limit=False, \ velocity_broadening=0.0, \ velocity_broadening_units='km/s',\ thermal_broaden_temperature=None,\ teunit='keV'): """ Turn on or off thermal broadening, and the emissivity limit for thost lines Parameters ---------- thermal_broadening : bool If true, turn on broadening. If False, turn it off. broaden_limit : float The emissivity limit for lines to be broadened. If False, this value will not be updated. velocity_broadening : float velocity broadening to apply. If <=0, not applied velocity_broadening_units : string Units of velocity_broadening. 'km/s' is default and only value so far. thermal_broaden_temperature : float If not None, use this temperature for all line broadening instead of plasma temperature (keV) Notes ----- Updates attributes thermal_broadening, broaden_limit, velocity_broadening, velocity_broadening_units """ self.thermal_broadening = thermal_broadening if broaden_limit != False: self.broaden_limit = broaden_limit if self.verbose: if self.thermal_broadening==True: print("Will thermally broaden lines with emissivity > %e ph cm3 s-1"%(self.broaden_limit)) else: print("Will not thermally broaden lines") self.velocity_broadening=velocity_broadening allowed_velocity_broadening_units= ['km/s'] if not velocity_broadening_units.lower() in ['km/s']: raise util.UnitsError("Error: velocity broadening units of %s is not in allowed set %s."%\ (velocity_broadening_units, repr(allowed_velocity_broadening_units))) return self.velocity_broadening_units=velocity_broadening_units self.spectra.thermal_broadening = self.thermal_broadening self.spectra.broaden_limit = self.broaden_limit self.spectra.velocity_broadening=self.velocity_broadening self.spectra.velocity_broadening_units=self.velocity_broadening_units if thermal_broaden_temperature is not None: T = util.convert_temp(thermal_broaden_temperature, teunit, 'K') self.thermal_broaden_temperature=T self.spectra.thermal_broaden_temperature=T else: self.thermal_broaden_temperature=None self.spectra.thermal_broaden_temperature=None
[docs] def set_response(self, rmf, arf=False, raw=False, sparse=False): """ Set the response. rmf, arf can either be the filenames or the opened files (latter is faster if called repeatedly) Extended Summary ---------------- Amends the following items: self.rmffile : string The rmf file name self.rmf : string The response matrix self.arffile : string The arf file name self.arf : string The arf data self.specbins : array(float) The spectral bins on which to calculate the spectrum (keV or A) self.specbin_units : string ['A','keV'] Units of specbins self.ebins : array(float) The spectral bins on which to calculate the spectrum (keV). self.ebins_out : array(float) The spectral bins on which to return the spectrum (keV). Can be different from specbins depending on the spectrum self.response_set : bool A response has been loaded self.response_type : string 'raw', 'standard' or 'sparse' depending on the type implemented self.specbins_set : bool The spectral bins are set self.ebins_checksum : string The md5checksum of the specbins Parameters ---------- rmf: string or HDUlist or array The response matrix file or energy bins (see raw k/w) arf: string or HDUlist The ancillary response file raw : bool If true, the rmf variable contains the energy bin edges (keV) for all the bins, and each bin has a perfect response. This is effectively a dummy response sparse : bool If true, the rmf is stored as a sparse matrix and solved using sparse matrix algebra. Useful for large RMFs (e.g. XRISM). Tests show accuracy to 1 part in 10^{15}. Ignored if raw==True Returns ------- none """ if raw==True: if sparse: warnings.warn("Sparse matrix requested with raw response. Ignoring.") # make a diagonal perfect response self.specbins = rmf self.ebins_out = rmf self.specbin_units='keV' self.aeff = numpy.ones(len(rmf)-1) self.response_set = True self.specbins_set = True self.arf = False self.ebins_checksum =hashlib.md5(self.specbins).hexdigest() self.raw_response=True self.response_type = 'raw' else: if util.keyword_check(arf): if type(arf)==str: self.arffile = arf self.arfdat = pyfits.open(arf) elif type(arf) == pyfits.hdu.hdulist.HDUList: self.arfdat = arf self.arffile = arf.filename() else: print("ERROR: unknown arf type, %s"%(repr(type(arf)))) return self.arf = numpy.array(self.arfdat['SPECRESP'].data['SPECRESP']) else: self.arf=1.0 self.raw_response=False if type(rmf)==str: self.rmffile = rmf self.rmf = pyfits.open(rmf) elif type(rmf) == pyfits.hdu.hdulist.HDUList: self.rmf = rmf self.rmffile = rmf.filename() else: print("ERROR: unknown rmf type, %s"%(repr(type(rmf)))) return # get the rmf matrix # alternate where we do matrix generation? # these are the *output* energy bins ebins = self.rmf['EBOUNDS'].data['E_MIN'] if ebins[-1] > ebins[0]: ebins = numpy.append(ebins, self.rmf['EBOUNDS'].data['E_MAX'][-1]) else: ebins = numpy.append(self.rmf['EBOUNDS'].data['E_MAX'][0],ebins) # find the name of the rmf matrix HDU try: k=self.rmf.index_of('MATRIX') matrixname = 'MATRIX' except KeyError: try: k=self.rmf.index_of('SPECRESP MATRIX') matrixname = 'SPECRESP MATRIX' except KeyError: print("Cannot find index for matrix in this data") raise # bugfix: not all missions index from 0 (or 1). # Use chanoffset to correct for this. if not sparse: chanoffset = self.rmf['EBOUNDS'].data['CHANNEL'][0] # ERROR CHECK FOR LEM RMF Files # # Somehow these files are indexed from both 1 and zero. # Check will be to look at the minimum and maximum channels # requested, if they are > channoffset, issue a warning, but continue # # Hopefully check can be removed someday... # ARF 2024-04-15 <- date added for posterity to see when "someday" occurs... min_bin = numpy.zeros(len(self.rmf[matrixname].data), dtype=int) max_bin = numpy.zeros(len(self.rmf[matrixname].data), dtype=int) for i in range(len(self.rmf[matrixname].data)): min_bin[i] = numpy.min(self.rmf[matrixname].data['F_CHAN'][i]) max_bin[i] = numpy.max(self.rmf[matrixname].data['F_CHAN'][i]+self.rmf[matrixname].data['N_CHAN'][i])-1 min_bin = min_bin.min() max_bin = max_bin.max() if max_bin > max(self.rmf['EBOUNDS'].data['CHANNEL']): if ((min_bin > 0) & (chanoffset==0)): # RAISE A WARNING warnings.warn("Using an rmf file with inconsistent E_BOUNDS and F_CHAN indexing. Continuing, but check your RMF file.", RuntimeWarning) # hack is to set chanoffset back to zero chanoffset = min_bin # END LEM Error Check fix self.rmfmatrix = numpy.zeros([len(self.rmf[matrixname].data),len(self.rmf['EBOUNDS'].data)]) for ibin, i in enumerate(self.rmf[matrixname].data): lobound = 0 fchan = i['F_CHAN']*1 nchan = i['N_CHAN']*1 if numpy.isscalar(fchan): fchan = numpy.array([fchan]) fchan -= chanoffset if numpy.isscalar(nchan): nchan = numpy.array([nchan]) for j in range(len(fchan)): ilo = fchan[j] if ilo < 0: continue ihi = fchan[j] + nchan[j] self.rmfmatrix[ibin,ilo:ihi]=i['MATRIX'][lobound:lobound+nchan[j]] lobound = lobound+nchan[j] self.specbins, self.ebins_out = _get_response_ebins(self.rmf) if self.ebins_out[-1] < self.ebins_out[0]: # need to reverse things self.ebins_out=self.ebins_out[::-1] self.rmfmatrix = self.rmfmatrix[:,::-1] self.specbin_units='keV' self.aeff = self.rmfmatrix.sum(1) if util.keyword_check(self.arf): self.aeff *=self.arf self.response_set = True self.specbins_set = True self.response_type = 'standard' self.ebins_checksum =hashlib.md5(self.specbins).hexdigest() else: # sparse matrix time! from scipy.sparse import bsr_array data=[] row=[] col=[] chanoffset = self.rmf['EBOUNDS'].data['CHANNEL'][0] for ibin, i in enumerate(self.rmf[matrixname].data): lobound = 0 for ngrp in range(i['N_GRP']): fchan = i['F_CHAN']*1 nchan = i['N_CHAN']*1 if numpy.isscalar(fchan): fchan = numpy.array([fchan]) fchan -= chanoffset if numpy.isscalar(nchan): nchan = numpy.array([nchan]) for j in range(len(fchan)): ilo = fchan[j] if ilo < 0: continue ihi = fchan[j] + nchan[j] data.extend(i['MATRIX'][lobound:lobound+nchan[j]]) row.extend(range(ilo,ihi)) col.extend([ibin]*nchan[j]) lobound = lobound+nchan[j] self.specbins, self.ebins_out = _get_response_ebins(self.rmf) data = numpy.array(data) row = numpy.array(row) col = numpy.array(col) if self.ebins_out[-1] < self.ebins_out[0]: # need to reverse things self.ebins_out=self.ebins_out[::-1] col = len(self.ebins_out)-col-1 self.rmfmatrix = bsr_array((data, (row, col)), shape=(len(self.specbins)-1, len(self.ebins_out)-1),\ dtype=data.dtype) self.specbin_units='keV' self.aeff = self.rmfmatrix.sum(1) if util.keyword_check(self.arf): self.aeff *=self.arf self.response_set = True self.specbins_set = True self.response_type = 'sparse' self.ebins_checksum =hashlib.md5(self.specbins).hexdigest() # this is now a check for 0 minimums if self.specbins_set: if self.specbins[0] <=0: warnings.warn('Response minimum energy is 0 keV, setting to small finite value (%e keV)'%(self.specbins[1]*1e-6)) self.specbins[0] = self.specbins[1]*1e-6
[docs] def return_spectrum(self, te, teunit='keV', nearest=False,\ get_nearest_t=False, log_interp=True): """ Get the spectrum at an exact temperature. Interpolates between 2 neighbouring spectra Finds HDU with kT closest to desired kT in given line or coco file. Opens the line or coco file, and looks for the header unit with temperature closest to te. Use result as index input to make_spectrum Parameters ---------- te : float Temperature in keV or K teunit : {'keV' , 'K'} Units of te (kev or K, default keV) nearest : bool If set, return the spectrum from the nearest tabulated temperature in the file, without interpolation get_nearest_t : bool If set, and `nearest` set, return the nearest tabulated temperature as well as the spectrum. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid, instead of linear. Returns ------- spectrum : array(float) The spectrum in photons cm^5 s^-1 bin^-1, with the response, or photons cm^3 s^-1 bin^-1 if raw is set. nearest_T : float, optional If `get_nearest_t` is set, return the actual temperature this corresponds to. Units are same as `teunit` """ # Check that there is a response set if not self.response_set: raise util.ReadyError("Response not yet set: use set_response to set.") # make element and abundance lists el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] self.spectra.ebins = self.specbins self.spectra.ebins_checksum=hashlib.md5(self.spectra.ebins).hexdigest() s= self.spectra.return_spectrum(te, teunit=teunit, nearest=nearest,\ elements = el_list, abundance=ab, \ broaden_object = self.cdf, \ log_interp=log_interp, dolines=self.dolines,\ dopseudo=self.dopseudo, docont=self.docont, \ do_eebrems = self.do_eebrems) ss = self._apply_response(s) return ss
# def _return_test_spectrum(self, spectrum_in): # """ # Get the spectrum at an exact temperature. # Interpolates between 2 neighbouring spectra # Finds HDU with kT closest to desired kT in given line or coco file. # Opens the line or coco file, and looks for the header unit # with temperature closest to te. Use result as index input to make_spectrum # Parameters # ---------- # te : float # Temperature in keV or K # teunit : {'keV' , 'K'} # Units of te (kev or K, default keV) # raw : bool # If set, return the spectrum without response applied. Default False. # nearest : bool # If set, return the spectrum from the nearest tabulated temperature # in the file, without interpolation # get_nearest_t : bool # If set, and `nearest` set, return the nearest tabulated temperature # as well as the spectrum. # Returns # ------- # spectrum : array(float) # The spectrum in photons cm^5 s^-1 bin^-1, with the response, or # photons cm^3 s^-1 bin^-1 if raw is set. # nearest_T : float, optional # If `nearest` is set, return the actual temperature this corresponds to. # Units are same as `teunit` # """ # # Check that there is a response set # if not self.response_set: # raise util.ReadyError("Response not yet set: use set_response to set.") # # make element and abundance lists # ss = self.apply_response(spectrum_in) # return ss def _apply_response(self, spectrum): """ Apply a response to a spectrum Parameters ---------- spectrum : array(float) The spectrum, in counts/bin/second, to have the response applied to. Must be binned on the same grid as the rmf. Returns ------- array(float) spectrum folded through the response """ # if the response is raw, no need to matrix multiply (diagonal response, effectively) if self.response_type=='raw': return spectrum elif self.response_type=='standard': # arfdat = self.arf ret = spectrum*self.arf try: ret = numpy.matmul(ret,self.rmfmatrix) except ValueError: try: ret = numpy.matmul(ret,self.rmfmatrix.transpose()) except ValueError: if ret == 0: ret = numpy.zeros(len(self.ebins_out)-1) return ret elif self.response_type=='sparse': #arfdat = self.arf ret = spectrum*self.arf ret = (self.rmfmatrix*ret).sum(1) return(ret) else: raise util.OptionError('Unknown response type %s'%(self.response_type)) def _set_apec_files(self, linefile, cocofile): """ Set the apec line and coco files, and load up their data Parameters ---------- linefile : str or HDUList The filename of the line emissivity data, or the opened file. cocofile : str or HDUList The filename of the continuum emissivity data, or the opened file. Returns ------- None Notes ----- Updates self.linefile, self.linedata, self.cocofile and self.cocodata """ if isinstance(linefile, str): lfile = os.path.expandvars(linefile) if not os.path.isfile(lfile): print("*** ERROR: no such file %s. Exiting ***" %(lfile)) return -1 self.linedata = pyfits.open(lfile) self.linefile = lfile elif isinstance(linefile, pyfits.hdu.hdulist.HDUList): # no need to do anything, file is already open self.linedata=linefile self.linefile=linefile.filename() else: print("Unknown data type for linefile. Please pass a string or an HDUList") if isinstance(cocofile, str): cfile = os.path.expandvars(cocofile) if not os.path.isfile(cfile): print("*** ERROR: no such file %s. Exiting ***" %(cfile)) return -1 self.cocodata=pyfits.open(cfile) self.cocofile=cfile elif isinstance(cocofile, pyfits.hdu.hdulist.HDUList): # no need to do anything, file is already open self.cocodata=cocofile self.cocofile=cocofile.filename() else: print("Unknown data type for cocofile. Please pass a string or an HDUList")
[docs] def set_eebrems(self, do_eebrems): """ Set whether to do the electron-electron bremsstrahlung in the spectrum Parameters ---------- do_eebrems : bool If True, include the electron-electron bremsstrahlung calculation If False, don't (this is default, and matches XSPEC behaviour) """ a = bool(do_eebrems) self.do_eebrems = a
[docs] def set_abund(self, elements, abund): """ Set the elemental abundance, relative to the abundset. Defaults to 1.0 for everything Parameters ---------- elements : int or array_like(int) The elements to change the abundance of abund : float or array_like(float) The new abundances. If only 1 value, set all `elements` to this abundance Otherwise, should be of same length as elements. Returns ------- None Examples -------- Set the abundance of iron to 0.5 >>> myspec.set_abund(26, 0.5) Set the abundance of iron and nickel to 0.1 and 0.2 respectively >>> myspec.set_abund([26, 28], [0.1,0.2]) Set the abundance of oxygen, neon, magnesium and iron to 0.1 >>> myspec.set_abund([8,10,12,26],0.1) """ abundvec, aisvec = util.make_vec(abund) elementvec, eisvec = util.make_vec(elements) if (aisvec): if len(abundvec)!= len(elementvec): print("abundance vector and element vector must have same number"+\ " of elements") print('ab', abundvec) print('el',elementvec) else: self.abund[elementvec] = abundvec elif (eisvec): # set all these eleemtns to the same abundance for el in elementvec: self.abund[el]=abund else: self.abund[elements]=abund
[docs] def set_abundset(self, abundstring=None): """ Set the abundance set. Parameters ---------- abundstring : string The abundance string (e.g. "AG89", "uniform". Case insensitive. See atomdb.get_abundance for list of possible abundances Returns ------- none updates self.abundset and self.abundsetvector. """ if abundstring==None: # Get abundance data file abunddata = atomdb.get_data(False, False, 'abund',datacache=self.datacache) # find the possible strings print("Possible abundance sets:") for name in abunddata[1].data.field('Source'): print(" %s"%(name)) return # read in the abundance the raw data was calculated on old = atomdb.get_abundance(abundset=self.default_abundset,\ datacache=self.datacache) # read in the new abundance new = atomdb.get_abundance(abundset=abundstring,\ datacache=self.datacache) # divide the 2, store the replacement ratio to self.abundsetvector for Z in range(1,const.MAXZ_CIE+1): try: self.abundsetvector[Z]=new[Z]/old[Z] except ZeroDivisionError: self.abundsetvector[Z] = 0.0 except IndexError: pass # update the current abundance string to represent your input self.abundset=abundstring
#self.recalc()
[docs] def return_line_emissivity(self, Te, Z, z1, up, lo, \ specunit='A', teunit='keV', \ apply_aeff=False, apply_abund=True,\ log_interp = True): """ Get line emissivity as function of Te. Parameters ---------- Te : float or array(float) Temperature(s) in keV or K Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the line emissivity in the linelist to modify their intensities. apply_abund : bool If true, apply the abundance set in the session to the result. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Otherwise linear Returns ------- ret : dict Dictionary containing: Te, tau, teunit : as input wavelength : line wavelength (A) energy : line energy (keV) epsilon : emissivity in ph cm^3 s-1 (or ph cm^5 s^-1 if apply_aeff=True). First index is temperature, second is tau. """ Tevec, Teisvec = util.make_vec(Te) kTlist = util.convert_temp(Tevec, teunit, 'keV') if apply_abund: ab = self.abund[Z]*self.abundsetvector[Z] else: ab = 1.0 eps = numpy.zeros(len(Tevec)) ret={} ret['wavelength']=None for ikT, kT in enumerate(kTlist): e, lam = self.spectra.return_line_emissivity(kT, Z, z1,\ up, lo,\ specunit='A',\ teunit='keV',\ abundance=ab) eps[ikT] = e if lam != False: ret['wavelength'] = lam * 1.0 #else: # ret['wavelength'] = None ret['Te'] = Te ret['teunit'] = teunit if ret['wavelength'] != None: ret['energy'] = const.HC_IN_KEV_A/ret['wavelength'] else: ret['energy'] = None if apply_aeff == True: e = ret['energy'] ibin = numpy.where(self.specbins<e)[0][-1] eps = eps*self.aeff[ibin] # now correct for vectors if not Teisvec: eps = eps[0] ret['epsilon'] = eps return ret
[docs] def return_linelist(self, Te, specrange, specunit='A', \ teunit='keV', apply_aeff=False, nearest=False,\ apply_binwidth=False): """ Get the list of line emissivities vs wavelengths Parameters ---------- Te : float Temperature in keV or K specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Angstrom','keV'} Units for specrange teunit : {'keV' , 'K'} Units of te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the lines in the linelist to modify their intensities. nearest : bool Return spectrum at nearest tabulated temperature, without interpolation apply_binwidth : bool Divide the line emissivity by the width of the bin they occupy to give emissivity per angstrom or per keV. Returns ------- linelist : array(dtype) The list of lines with lambda (A), energy (keV), epsilon (ph cm3 s-1),\ epsilon_aeff (ph cm5 s-1) ion (string) and upper & lower levels. """ kT = util.convert_temp(Te, teunit, 'keV') # make element and abundance lists el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] s= self.spectra.return_linelist(kT, specrange=specrange, teunit='keV',\ specunit=specunit, elements=el_list,\ abundance = ab, nearest=nearest) # do the response thing #resp = s.response() if apply_aeff == True: epsilon_aeff = self._apply_linelist_aeff(s, specunit, apply_binwidth) s['Epsilon'] = epsilon_aeff return(s)
def _apply_linelist_aeff(self, linelist, specunit, apply_binwidth): """ Apply effective area to the linelist, return in 'Epsilon_Err'. Parameters ---------- linelist : array List of lines specunit : str A or keV apply_binwidth : bool If true, return emissivity per angstrom or per keV. If false, total. Returns ------- emiss_aeff : array(float) Emissivity \* Aeff """ if specunit.lower()=='kev': binwidth = self.ebins_out[1:]-self.ebins_out[:-1] factor = numpy.zeros(len(linelist), dtype=float) for i, ss in enumerate(linelist): e = const.HC_IN_KEV_A/ss['Lambda'] if e>self.specbins[-1]: factor[i] = 0.0 elif e<self.specbins[0]: factor[i] = 0.0 else: ibin = numpy.where(self.specbins<e)[0][-1] factor[i]=self.aeff[ibin] if apply_binwidth: factor[i] /= binwidth[ibin] emiss_aeff = linelist['Epsilon']*factor elif specunit.lower()=='a': wvbins=12.398425/self.ebins_out[::-1] binwidth = wvbins[1:]-wvbins[:-1] factor = numpy.zeros(len(linelist), dtype=float) for i, ss in enumerate(linelist): e = ss['Lambda'] if e>wvbins[-1]: factor[i] = 0.0 elif e<wvbins[0]: factor[i] = 0.0 else: ibin = numpy.where(wvbins<e)[0][-1] factor[i]=self.aeff[::-1][ibin] if apply_binwidth: factor[i] /= binwidth[ibin] emiss_aeff = linelist['Epsilon']*factor return emiss_aeff def _adjust_line(self, change, Z=0, z1=0, z1_drv=0, upper=0,lower=0, quantity="Epsilon", method="Replace", trackchanges=False): """ Change the emissivity or wavelength of a line. Integer parameters set to 0 mean "all". Note this all happens in memory and does not edit the underlying files. Parameters ---------- change : float or str If float, set the new value to this. If string Z : int Element z1 : int Ion z1_drv : int Driving ion upper : int Upper level lower : int Lower level quantity : string Change "Epsilon" or "Lambda" - emissivity or wavelength - by change method : string "Replace": replace existing value with change "Multiply" : multiply existing value with change "Divide" : divide existing value by change "Add" : add change to existing value "Subtract" : subtract change from existing Returns ------- None """ meth = method.lower() if Z==0: Zlist = self.elements else: Zlist = [Z] for Zt in Zlist: if z1==0: z1list = range(1,z1+2) else: z1list=[z1] for z1t in z1list: if z1_drv==0: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=range(1,z1+2) else: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=[z1_drv] for z1_drvt in z1_drvlist: # go through each temperature, see if there is line data for this # ion. If so, change it according to uppper, lower for ikT in range(len(self.spectra.kTlist)): # check if there is any data for this element try: ldat = self.spectra.spectra[ikT][Zt].lines.lines except KeyError: # element or temperature data doesn't exist continue tochange = numpy.ones(len(ldat), dtype=bool) if upper!=0: tochange[ldat['UpperLev'] != upper] = False if lower != 0: tochange[ldat['LowerLev'] != lower] = False if trackchanges: tochange[self.spectra.spectra[ikT][Zt].lines.changed==True] = False if sum(tochange) > 0: if meth=='replace': ldat[quantity][tochange] = change if meth=='add': ldat[quantity][tochange] += change if meth=='subtract': ldat[quantity][tochange] -= change if meth=='divide': ldat[quantity][tochange] /= change if meth=='multiply': ldat[quantity][tochange] *= change self.spectra.spectra[ikT][Zt].lines.changed[tochange] = True return def _adjust_line_lambda(self, change, Z, z1, upper,lower, quantity="Epsilon", method="Replace", trackchanges=False): """ Change the emissivity or wavelength of a line. Integer parameters set to 0 mean "all". Note this all happens in memory and does not edit the underlying files. Parameters ---------- change : float or str If float, set the new value to this. If string Z : int Element z1 : int Ion upper : int Upper level lower : int Lower level quantity : string Change "Epsilon" or "Lambda" - emissivity or wavelength - by change method : string "Replace": replace existing value with change "Multiply" : multiply existing value with change "Divide" : divide existing value by change "Add" : add change to existing value "Subtract" : subtract change from existing Returns ------- None """ meth = method.lower() # see if this line is already listed try: value=self.spectra.fixwavelength[Z][z1][upper][lower] except AttributeError: self.spectra.fixwavelength={} if not Z in self.spectra.fixwavelength.keys(): self.spectra.fixwavelength[Z]={} if not z1 in self.spectra.fixwavelength[Z].keys(): self.spectra.fixwavelength[Z][z1]={} if not upper in self.spectra.fixwavelength[Z][z1].keys(): self.spectra.fixwavelength[Z][z1][upper]={} if not lower in self.spectra.fixwavelength[Z][z1][upper].keys(): self.spectra.fixwavelength[Z][z1][upper][lower]=quantity if Z==0: Zlist = self.elements else: Zlist = [Z] for Zt in Zlist: if z1==0: z1list = range(1,z1+2) else: z1list=[z1] for z1t in z1list: if z1_drv==0: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=range(1,z1+2) else: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=[z1_drv] for z1_drvt in z1_drvlist: # go through each temperature, see if there is line data for this # ion. If so, change it according to uppper, lower for ikT in range(len(self.spectra.kTlist)): # check if there is any data for this element try: ldat = self.spectra.spectra[ikT][Zt].lines.lines except KeyError: # element or temperature data doesn't exist continue tochange = numpy.ones(len(ldat), dtype=bool) if upper!=0: tochange[ldat['UpperLev'] != upper] = False if lower != 0: tochange[ldat['LowerLev'] != lower] = False if trackchanges: tochange[self.spectra.spectra[ikT][Zt].lines.changed==True] = False if sum(tochange) > 0: if meth=='replace': ldat[quantity][tochange] = change if meth=='add': ldat[quantity][tochange] += change if meth=='subtract': ldat[quantity][tochange] -= change if meth=='divide': ldat[quantity][tochange] /= change if meth=='multiply': ldat[quantity][tochange] *= change self.spectra.spectra[ikT][Zt].lines.changed[tochange] = True return
[docs] def format_linelist(self, linelist): """ Print out the line list in a nice format, with all the transitions described in more detail Parameters ---------- linelist : numpy.array(dtype=linelist) The linelist returned from "return_linelist" Returns ------- str : the formatted linelist """ s="" header1 = "" header2 = "" header1 += 'Z z1 Ion ' header2 += '-- -- ----------' if 'ion_drv' in linelist.dtype.names: header1 += " z1drv" header2 += " -----" header1 += " Wave (A) En (keV) Epsilon UpLev LoLev" header2 += " ---------- --------- ---------- ----- -----" s += header1+'\n' s += header2+'\n' for l in linelist: s += "%2i %2i %10s"% (l['Element'], l['Ion'], atomic.spectroscopic_name(l['Element'], l['Ion'])) if 'Ion_drv' in l.dtype.names: s+= " %2i "%(l['Ion_drv']) s += " %10f %10f %10e %5i -> %5i ;" % ( l['Lambda'], const.HC_IN_KEV_A/l['Lambda'], l['Epsilon'], l['UpperLev'], l['LowerLev']) d = atomdb.get_data(l['Element'], l['Ion'], 'LV', datacache=self.datacache) ll = d[1].data[l['LowerLev']-1] try: ul = d[1].data[l['UpperLev']-1] ll = d[1].data[l['LowerLev']-1] s+= atomdb.format_level(ul) s+= " -> " s+= atomdb.format_level(ll) except: # ul = d[1].data[l['UpperLev']-1] ll = d[1].data[l['LowerLev']-1] if l['UpperLev'] >=10000: s+= " DR Satellite Line" s+="\n" return(s)
[docs] class CIESession_RS(CIESession): """ Load and generate a collisional ionization equilibrium spectrum Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Default AG89. Attributes ---------- datacache : dict Any Atomdb FITS files which have to be opened are stored here spectra : CIESpectra Object storing the actual spectral data elements : iterable of int Elements to include, listed by atomic number. if not set, include all. default_abundset : string The abundance set used for the original emissivity file calculation abundset : string The abundance set to be used for the returned spectrum abundsetvector : array_like(float) The relative abundance between default_abundset and abundset for each element response_set : bool Have we loaded a response (or set a dummy response) dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) broaden_limit : float Apply broadening to lines with epsilon > this value (ph cm3 s-1) thermal_broadening : bool Apply thermal broadening to lines (default = False) velocity_broadening : float Apply velocity broadening with this velocity (km/s). If <=0, do not apply. Examples -------- >> import pyatomdb,atomdb Create a session instance: >>> cie_rs = pyatomdb.spectrum.CIESession_RS() Set up the responses, in this case a dummy response from 0.1 to 10 keV, with area 1cm^2 in each bin >>> ebins = numpy.linspace(0.1,10,1000) >>> cie_rs.set_response(ebins, raw=True) (Alternatively, for a real response file, s.set_response(rmffile, arf=arffile) Turn on thermal broadening >>> cie_rs.set_broadening(True) Will thermally broaden lines with emissivity > 1.000000e-20 ph cm3 s-1 Specify abundance >>>Abund= atomdb.get_abundance(abundfile=False, \ abundset='AG89', element=[-1], datacache=False, settings=False, show=False) Return spectrum at 1.0keV and density = 1cm^-3 >>> spec = cie_rs.return_spectrum(1.0, 1.0, Ab=Abund) spec is in photons cm^5 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) """ #print("File osc.fits downloaded successfully.") #else: #print("File osc.fits exists.") def __init__(self, oscfile="$ATOMDB/apec_osc.fits",\ cocofile="$ATOMDB/apec_coco.fits",\ elements=False,\ abundset='AG89'): """ Initialization routine. Can set the line and continuum files here Parameters ----- linefile : str or HDUList The filename of the line emissivity data, or the file opened with pyfits. cocofile : str or HDUList The filename of the continuum emissivity data, or the file opened with pyfits. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. Returns ------- None """ # file_oscfile = "$ATOMDB/osc.fits" f_osc = os.path.expandvars(oscfile) # atomdb_path = os.path.expandvars("$ATOMDB") import bz2 if not os.path.exists(f_osc): curversion = open(os.path.expandvars('$ATOMDB/VERSION'),'r').read()[:-1] fname = f_osc.split('/')[-1] f = fname.split('_') fname = f[0]+'_v'+curversion+'_'+f[1] url = const.FTPPATH+'/releases/'+fname+'.bz2' wget.download(url, out=os.path.expandvars("$ATOMDB")) with bz2.open(os.path.expandvars("$ATOMDB/"+fname+".bz2"), "rb") as f: content = f.read() o = open(os.path.expandvars("$ATOMDB/"+fname),'wb') o.write(content) o.close() os.symlink(os.path.expandvars("$ATOMDB/"+fname), os.path.expandvars("$ATOMDB/apec_osc.fits")) linefile=oscfile self.SessionType='CIE' self._session_initialise1(linefile, cocofile, elements, abundset) # a hold for the spectra self.spectra=_CIESpectrum_RS(self.linedata, self.cocodata) self._session_initialise2()
[docs] def return_spectrum(self, te, N_e, Ab, teunit='keV', nearest=False,\ get_nearest_t=False, log_interp=True): """ Get the spectrum at an exact temperature. Interpolates between 2 neighbouring spectra Finds HDU with kT closest to desired kT in given line or coco file. Opens the line or coco file, and looks for the header unit with temperature closest to te. Use result as index input to make_spectrum Parameters ---------- te : float Temperature in keV or K teunit : {'keV' , 'K'} Units of te (kev or K, default keV) nearest : bool If set, return the spectrum from the nearest tabulated temperature in the file, without interpolation get_nearest_t : bool If set, and `nearest` set, return the nearest tabulated temperature as well as the spectrum. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid, instead of linear. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) Returns ------- spectrum : array(float) The spectrum in photons cm^5 s^-1 bin^-1, with the response, or photons cm^3 s^-1 bin^-1 if raw is set. nearest_T : float, optional If `get_nearest_t` is set, return the actual temperature this corresponds to. Units are same as `teunit` """ # Check that there is a response set if not self.response_set: raise util.ReadyError("Response not yet set: use set_response to set.") # make element and abundance lists el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] self.spectra.ebins = self.specbins self.spectra.ebins_checksum=hashlib.md5(self.spectra.ebins).hexdigest() s= self.spectra.return_spectrum(te, N_e, Ab, teunit=teunit, nearest=nearest,\ elements = el_list, abundance=ab, \ broaden_object = self.cdf, \ log_interp=log_interp, dolines=self.dolines,\ dopseudo=self.dopseudo, docont=self.docont, \ do_eebrems = self.do_eebrems) ss = self._apply_response(s) return ss
# def _return_test_spectrum(self, spectrum_in): # """ # Get the spectrum at an exact temperature. # Interpolates between 2 neighbouring spectra # Finds HDU with kT closest to desired kT in given line or coco file. # Opens the line or coco file, and looks for the header unit # with temperature closest to te. Use result as index input to make_spectrum # Parameters # ---------- # te : float # Temperature in keV or K # teunit : {'keV' , 'K'} # Units of te (kev or K, default keV) # raw : bool # If set, return the spectrum without response applied. Default False. # nearest : bool # If set, return the spectrum from the nearest tabulated temperature # in the file, without interpolation # get_nearest_t : bool # If set, and `nearest` set, return the nearest tabulated temperature # as well as the spectrum. # Returns # ------- # spectrum : array(float) # The spectrum in photons cm^5 s^-1 bin^-1, with the response, or # photons cm^3 s^-1 bin^-1 if raw is set. # nearest_T : float, optional # If `nearest` is set, return the actual temperature this corresponds to. # Units are same as `teunit` # """ # # Check that there is a response set # if not self.response_set: # raise util.ReadyError("Response not yet set: use set_response to set.") # # make element and abundance lists # ss = self.apply_response(spectrum_in) # return ss
[docs] def return_line_emissivity(self, Te, Z, z1, up, lo, \ specunit='A', teunit='keV', \ apply_aeff=False, apply_abund=True,\ log_interp = True): """ Get line emissivity as function of Te. Parameters ---------- Te : float or array(float) Temperature(s) in keV or K Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the line emissivity in the linelist to modify their intensities. apply_abund : bool If true, apply the abundance set in the session to the result. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Otherwise linear Returns ------- ret : dict Dictionary containing: Te, tau, teunit : as input wavelength : line wavelength (A) energy : line energy (keV) epsilon : emissivity in ph cm^3 s-1 (or ph cm^5 s^-1 if apply_aeff=True). First index is temperature, second is tau. """ Tevec, Teisvec = util.make_vec(Te) kTlist = util.convert_temp(Tevec, teunit, 'keV') if apply_abund: ab = self.abund[Z]*self.abundsetvector[Z] else: ab = 1.0 eps = numpy.zeros(len(Tevec)) ret={} ret['wavelength']=None for ikT, kT in enumerate(kTlist): e, lam = self.spectra.return_line_emissivity(kT, Z, z1,\ up, lo,\ specunit='A',\ teunit='keV',\ abundance=ab) eps[ikT] = e if lam != False: ret['wavelength'] = lam * 1.0 #else: # ret['wavelength'] = None ret['Te'] = Te ret['teunit'] = teunit if ret['wavelength'] != None: ret['energy'] = const.HC_IN_KEV_A/ret['wavelength'] else: ret['energy'] = None if apply_aeff == True: e = ret['energy'] ibin = numpy.where(self.specbins<e)[0][-1] eps = eps*self.aeff[ibin] # now correct for vectors if not Teisvec: eps = eps[0] ret['epsilon'] = eps return ret
[docs] def return_linelist(self, Te, specrange, specunit='A', \ teunit='keV', apply_aeff=False, nearest=False,\ apply_binwidth=False): """ Get the list of line emissivities vs wavelengths Parameters ---------- Te : float Temperature in keV or K specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Angstrom','keV'} Units for specrange teunit : {'keV' , 'K'} Units of te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the lines in the linelist to modify their intensities. nearest : bool Return spectrum at nearest tabulated temperature, without interpolation apply_binwidth : bool Divide the line emissivity by the width of the bin they occupy to give emissivity per angstrom or per keV. Returns ------- linelist : array(dtype) The list of lines with lambda (A), energy (keV), epsilon (ph cm3 s-1),\ epsilon_aeff (ph cm5 s-1) ion (string) and upper & lower levels. """ kT = util.convert_temp(Te, teunit, 'keV') # make element and abundance lists el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] s= self.spectra.return_linelist(kT, specrange=specrange, teunit='keV',\ specunit=specunit, elements=el_list,\ abundance = ab, nearest=nearest) # do the response thing #resp = s.response() if apply_aeff == True: epsilon_aeff = self._apply_linelist_aeff(s, specunit, apply_binwidth) s['Epsilon'] = epsilon_aeff return(s)
def _apply_linelist_aeff(self, linelist, specunit, apply_binwidth): """ Apply effective area to the linelist, return in 'Epsilon_Err'. Parameters ---------- linelist : array List of lines specunit : str A or keV apply_binwidth : bool If true, return emissivity per angstrom or per keV. If false, total. Returns ------- emiss_aeff : array(float) Emissivity \* Aeff """ if specunit.lower()=='kev': binwidth = self.ebins_out[1:]-self.ebins_out[:-1] factor = numpy.zeros(len(linelist), dtype=float) for i, ss in enumerate(linelist): e = const.HC_IN_KEV_A/ss['Lambda'] if e>self.specbins[-1]: factor[i] = 0.0 elif e<self.specbins[0]: factor[i] = 0.0 else: ibin = numpy.where(self.specbins<e)[0][-1] factor[i]=self.aeff[ibin] if apply_binwidth: factor[i] /= binwidth[ibin] emiss_aeff = linelist['Epsilon']*factor elif specunit.lower()=='a': wvbins=12.398425/self.ebins_out[::-1] binwidth = wvbins[1:]-wvbins[:-1] factor = numpy.zeros(len(linelist), dtype=float) for i, ss in enumerate(linelist): e = ss['Lambda'] if e>wvbins[-1]: factor[i] = 0.0 elif e<wvbins[0]: factor[i] = 0.0 else: ibin = numpy.where(wvbins<e)[0][-1] factor[i]=self.aeff[::-1][ibin] if apply_binwidth: factor[i] /= binwidth[ibin] emiss_aeff = linelist['Epsilon']*factor return emiss_aeff def _adjust_line(self, change, Z=0, z1=0, z1_drv=0, upper=0,lower=0, quantity="Epsilon", method="Replace", trackchanges=False): """ Change the emissivity or wavelength of a line. Integer parameters set to 0 mean "all". Note this all happens in memory and does not edit the underlying files. Parameters ---------- change : float or str If float, set the new value to this. If string Z : int Element z1 : int Ion z1_drv : int Driving ion upper : int Upper level lower : int Lower level quantity : string Change "Epsilon" or "Lambda" - emissivity or wavelength - by change method : string "Replace": replace existing value with change "Multiply" : multiply existing value with change "Divide" : divide existing value by change "Add" : add change to existing value "Subtract" : subtract change from existing Returns ------- None """ meth = method.lower() if Z==0: Zlist = self.elements else: Zlist = [Z] for Zt in Zlist: if z1==0: z1list = range(1,z1+2) else: z1list=[z1] for z1t in z1list: if z1_drv==0: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=range(1,z1+2) else: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=[z1_drv] for z1_drvt in z1_drvlist: # go through each temperature, see if there is line data for this # ion. If so, change it according to uppper, lower for ikT in range(len(self.spectra.kTlist)): # check if there is any data for this element try: ldat = self.spectra.spectra[ikT][Zt].lines.lines except KeyError: # element or temperature data doesn't exist continue tochange = numpy.ones(len(ldat), dtype=bool) if upper!=0: tochange[ldat['UpperLev'] != upper] = False if lower != 0: tochange[ldat['LowerLev'] != lower] = False if trackchanges: tochange[self.spectra.spectra[ikT][Zt].lines.changed==True] = False if sum(tochange) > 0: if meth=='replace': ldat[quantity][tochange] = change if meth=='add': ldat[quantity][tochange] += change if meth=='subtract': ldat[quantity][tochange] -= change if meth=='divide': ldat[quantity][tochange] /= change if meth=='multiply': ldat[quantity][tochange] *= change self.spectra.spectra[ikT][Zt].lines.changed[tochange] = True return def _adjust_line_lambda(self, change, Z, z1, upper,lower, quantity="Epsilon", method="Replace", trackchanges=False): """ Change the emissivity or wavelength of a line. Integer parameters set to 0 mean "all". Note this all happens in memory and does not edit the underlying files. Parameters ---------- change : float or str If float, set the new value to this. If string Z : int Element z1 : int Ion upper : int Upper level lower : int Lower level quantity : string Change "Epsilon" or "Lambda" - emissivity or wavelength - by change method : string "Replace": replace existing value with change "Multiply" : multiply existing value with change "Divide" : divide existing value by change "Add" : add change to existing value "Subtract" : subtract change from existing Returns ------- None """ meth = method.lower() # see if this line is already listed try: value=self.spectra.fixwavelength[Z][z1][upper][lower] except AttributeError: self.spectra.fixwavelength={} if not Z in self.spectra.fixwavelength.keys(): self.spectra.fixwavelength[Z]={} if not z1 in self.spectra.fixwavelength[Z].keys(): self.spectra.fixwavelength[Z][z1]={} if not upper in self.spectra.fixwavelength[Z][z1].keys(): self.spectra.fixwavelength[Z][z1][upper]={} if not lower in self.spectra.fixwavelength[Z][z1][upper].keys(): self.spectra.fixwavelength[Z][z1][upper][lower]=quantity if Z==0: Zlist = self.elements else: Zlist = [Z] for Zt in Zlist: if z1==0: z1list = range(1,z1+2) else: z1list=[z1] for z1t in z1list: if z1_drv==0: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=range(1,z1+2) else: if self.SessionType=='CIE': z1_drvlist=[0] else: z1_drvlist=[z1_drv] for z1_drvt in z1_drvlist: # go through each temperature, see if there is line data for this # ion. If so, change it according to uppper, lower for ikT in range(len(self.spectra.kTlist)): # check if there is any data for this element try: ldat = self.spectra.spectra[ikT][Zt].lines.lines except KeyError: # element or temperature data doesn't exist continue tochange = numpy.ones(len(ldat), dtype=bool) if upper!=0: tochange[ldat['UpperLev'] != upper] = False if lower != 0: tochange[ldat['LowerLev'] != lower] = False if trackchanges: tochange[self.spectra.spectra[ikT][Zt].lines.changed==True] = False if sum(tochange) > 0: if meth=='replace': ldat[quantity][tochange] = change if meth=='add': ldat[quantity][tochange] += change if meth=='subtract': ldat[quantity][tochange] -= change if meth=='divide': ldat[quantity][tochange] /= change if meth=='multiply': ldat[quantity][tochange] *= change self.spectra.spectra[ikT][Zt].lines.changed[tochange] = True return
class _CIESpectrum(): """ A class holding the emissivity data for CIE emission, and returning spectra Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) Attributes ---------- SessionType : string "CIE" spectra : dict of _ElementSpectrum a dictionary containing the emissivity data for each HDU, subdivided by element (spectra[12][18] is an _ElementSpectrum object containing the argon data for the 12th HDU) kTlist : array The temperatures for each emissivity HDU, in keV logkTlist : array log of kTlist """ def __init__(self, linedata, cocodata): """ Initializes the code. Populates the line and emissivity data in all temperature HDUs. Parameters ---------- linefile : HDUList The line emissivity data cocofile : HDUList The continuum emissivity data """ self.datacache = {} self.SessionType = 'CIE' picklefname = os.path.expandvars('$ATOMDB/spectra_%s_%s.pkl'%\ (linedata[0].header['CHECKSUM'],\ cocodata[0].header['CHECKSUM'])) havepicklefile = False if os.path.isfile(picklefname): havepicklefile = True if havepicklefile: try: self.spectra = pickle.load(open(picklefname,'rb')) self.kTlist = self.spectra['kTlist'] except AttributeError: havepicklefile=False print("pre-stored data in %s is out of date. This can be caused by updates to the data "%(picklefname)+ "or, more likely, changes to pyatomdb. Regenerating...") # delete the old file if os.path.isfile(picklefname): os.remove(picklefname) if not havepicklefile: self.spectra={} self.kTlist = numpy.array(linedata[1].data['kT'].data) self.spectra['kTlist']=numpy.array(linedata[1].data['kT'].data) for ihdu in range(len(self.kTlist)): self.spectra[ihdu]={} self.spectra[ihdu]['kT'] = self.kTlist[ihdu] ldat = numpy.array(linedata[ihdu+2].data.data) # revision to do with variable length continuum arrays cdat = cocodata[ihdu+2].data Zarr = numpy.zeros([len(ldat), const.MAXZ_NEI+1], dtype=bool) Zarr[numpy.arange(len(ldat), dtype=int), ldat['Element']]=True for Z in range(1,const.MAXZ_NEI+1): if not Z in self.spectra[ihdu].keys(): self.spectra[ihdu][Z] = {} icdat = numpy.where((cdat['Z']==Z) & (cdat['rmJ']==0))[0] if len(icdat)==0: ccdat = [False] else: ccdat=[cdat[icdat[0]]] # this format is very tempremental and space inefficient. Make into a numpy array ccdat = numpy.zeros(1, dtype = apec.generate_datatypes('continuum', \ npseudo=cdat[icdat[0]]['N_Pseudo'], \ ncontinuum=cdat[icdat[0]]['N_Cont'])) ccdat['Z'] = cdat[icdat[0]]['Z'] ccdat['rmJ'] = cdat[icdat[0]]['rmJ'] ccdat['N_Cont'] = cdat[icdat[0]]['N_Cont'] ccdat['N_Pseudo'] = cdat[icdat[0]]['N_Pseudo'] ccdat['E_Pseudo'] = cdat[icdat[0]]['E_Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['Pseudo'] = cdat[icdat[0]]['Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['E_Cont'] = cdat[icdat[0]]['E_Cont'][:ccdat['N_Cont'][0]] ccdat['Continuum'] = cdat[icdat[0]]['Continuum'][:ccdat['N_Cont'][0]] self.spectra[ihdu][Z]=_ElementSpectrum(ldat[Zarr[:,Z]],\ ccdat[0], Z) pickle.dump(self.spectra, open(picklefname,'wb')) self.logkTlist=numpy.log(self.kTlist) def get_nearest_Tindex(self, Te, teunit='keV', nearest=False,\ log_interp=True): """ Return the nearest temperature index in the emissivity file, or, alternatively, the array of fractions to sum Parameters ---------- Te : float Temperature (keV by default) teunit : string Units of kT nearest : bool If true, return only nearest. Otherwise, return nearest 2 and fractions log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Otherwise linear Returns ------- ikT : list[int] Index of temperature in HDU file (from 0, not 2) f : list[float] fractional weight to apply to each ikT. Should sum to 1. """ kT = util.convert_temp(Te, teunit, 'keV') # find the nearest temperature if kT < self.kTlist[0]: print("kT = %f is below minimum range of %f. Returning lowest kT spectrum available"%\ (kT, self.kTlist[0])) ikT = [0] f=[1.0] elif kT > self.kTlist[-1]: print("kT = %f is above maximum range of %f. Returning highest kT spectrum available"%\ (kT, self.kTlist[-1])) ikT = [len(self.kTlist)-1] f=[1.0] else: if log_interp: if nearest: ikT = [numpy.argmin(numpy.abs(self.logkTlist - numpy.log(kT)))] f=[1.0] else: ikT = numpy.where(self.kTlist < kT)[0][-1] ikT = [ikT, ikT+1] f = 1- (numpy.log(kT)-self.logkTlist[ikT[0]])/\ (self.logkTlist[ikT[1]]-self.logkTlist[ikT[0]]) f = [f,1-f] else: if nearest: ikT = [numpy.argmin(numpy.abs(self.kTlist - kT))] f=[1.0] else: ikT = numpy.where(self.kTlist < kT)[0][-1] ikT = [ikT, ikT+1] f = 1- (kT-self.kTlist[ikT[0]])/\ (self.kTlist[ikT[1]]-self.kTlist[ikT[0]]) f = [f,1-f] return ikT, f #----------------------------------------------------------------------- def return_spectrum(self, Te, teunit='keV', nearest = False,\ elements=False, abundance=False, log_interp=True,\ broaden_object=False,\ dolines=True, docont=True, dopseudo=True, \ do_eebrems = True): """ Return the spectrum of the element on the energy bins in self.session.specbins Parameters ---------- Te : float Electron temperature (default, keV) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) do_eebrems : bool Calculate electron-electron bremsstrahlung emission (default True) Returns ------- spec : array(float) The element's emissivity spectrum, in photons cm^3 s^-1 bin^-1 """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: if elements is False: elements=range(1,const.MAXZ_CIE+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 s = 0.0 # electron-electron bremsstrahlung electron counter if do_eebrems: nel=0.0 rawabund = atomdb.get_abundance(datacache=self.datacache) # get the line broadening temperature. Typically this is kT unless # overridden using XXXSession.set_broadening if self.thermal_broaden_temperature is not None: Tb= util.convert_temp(self.thermal_broaden_temperature, 'K', 'keV') else: Tb=kT for Z in elements: abund = abundance[Z] if abund > 0: epslimit = self.broaden_limit/abund # go caclulate the spectrum, with broadening as assigned. ss=0.0 if len(ikT) == 1: ss = self.spectra[ikT[0]][Z].return_spectrum(self.ebins,\ Tb,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund else: ss1 = self.spectra[ikT[0]][Z].return_spectrum(self.ebins,\ Tb,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund ss2 = self.spectra[ikT[1]][Z].return_spectrum(self.ebins,\ Tb,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund ss = self._merge_spectra_temperatures(f,ss1,ss2,log_interp) s+=ss if do_eebrems: ionpop=apec.return_ionbal(Z, kT, datacache=self.datacache, teunit='keV') Zabundance = rawabund[Z]*abund tmp = sum(ionpop*numpy.arange(Z+1))*Zabundance nel +=tmp if do_eebrems: eespec = calc_ee_brems_spec(self.ebins, kT, nel) # Here we divide the eebrems by nel/nH so that the resulting # spectrum simply needs to # be multiplied by nenH try: nH =rawabund[1]*abundance[1] except KeyError: print("Warning: as this plasma has no hydrogen in it, assuming nH=1 for electron-electron bremstrahlung renorm") nH=1.0 s+= eespec/(nel/nH) return s def return_line_emissivity(self, Te, Z, z1, up, lo, specunit='A', teunit='keV', abundance=1.0, log_interp = True): """ Return the emissivity of a line at temperature Te. Parameters ---------- Te : float Temperature in keV or K Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) abundance : float The abundances of the element, e.g. 1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- eps : float Emissivity in photons cm^3 s^-1 lam : float Wavelength (A) or Energy (keV) of line, depending on specunit """ kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, \ teunit='keV', \ nearest=False, \ log_interp=log_interp) #ikT has the 2 nearest temperature indexes # f has the fraction for each # find lines which match eps_in = numpy.zeros(len(ikT)) eps = 0.0 lam = 0.0 for i in range(len(ikT)): iikT =ikT[i] llist = self.spectra[iikT][Z].return_linematch(Z,z1,up,lo) for line in llist: # add emissivity eps_in[i] += line['Epsilon'] lam = line['Lambda'] if log_interp: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*numpy.log(eps_in[i]+const.MINEPSOFFSET) eps += numpy.exp(eps_out-const.MINEPSOFFSET)*abundance else: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*eps_in[i] eps += eps_out*abundance if specunit == 'keV': lam = const.HC_IN_KEV_A/lam return eps, lam def _merge_spectra_temperatures(self, f, spec1, spec2, log_interp): """ Merge spectra 1 and 2, using fractions f, using log or non-log scaling as determined by loginterp Paramters --------- f : [float, float] The weight to apply to spec1 and spec 2. Should sum to 1. spec1 : array(float) The 1st spectrum to merge spec2 : array(float) The 2nd spectrum to merge log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- spec : array(float) The merged spectrum """ if log_interp: out = numpy.log(spec1+const.MINEPSOFFSET) * f[0]+\ numpy.log(spec2+const.MINEPSOFFSET) * f[1] spec = numpy.exp(out)-const.MINEPSOFFSET else: spec = spec1*f[0] + spec2*f[1] spec[spec<0]=0.0 return spec def _merge_linelist_duplicates(self, llist, by_ion_drv=False): """ Look through the linelists and search for duplicates. Duplicated lines are summed. Parameters ---------- llist : array(linelist) The list of lines to check by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. Returns ------- llist_out : array(linelist) The lines after filtering for duplicates. """ # sort the data if by_ion_drv: llist =numpy.sort(llist, order=['Ion','Ion_drv','UpperLev','LowerLev']) else: llist =numpy.sort(llist, order=['Ion','UpperLev','LowerLev']) # merge keep = numpy.ones(len(llist), dtype=bool) # find each level where the next is for the same transition if by_ion_drv: j = numpy.where((llist[1:]['Ion']==llist[:-1]['Ion']) &\ (llist[1:]['Ion_drv']==llist[:-1]['Ion_drv']) &\ (llist[1:]['UpperLev']==llist[:-1]['UpperLev']) &\ (llist[1:]['LowerLev']==llist[:-1]['LowerLev']))[0] else: j = numpy.where((llist[1:]['Ion']==llist[:-1]['Ion']) &\ (llist[1:]['UpperLev']==llist[:-1]['UpperLev']) &\ (llist[1:]['LowerLev']==llist[:-1]['LowerLev']))[0] for jj in j: # add the emissivity to the second of the 2 llist['Epsilon'][jj+1] += llist['Epsilon'][jj] keep[jj]=False # remove all the non-keepers llist_out = llist[keep] # fix the emissivities if log scaled return llist_out def _merge_linelists_temperatures(self, f, llist1, llist2, log_interp, by_ion_drv=False): """ Merge linelists 1 and 2, using fractions f, using log or non-log scaling as determined by loginterp. Note that only lines which appear in both temperatures will be returned. Parameters --------- f : [float, float] The weight to apply to spec1 and spec 2. Should sum to 1. llist1 : array(float) The 1st linelist to merge llist2 : array(float) The 2nd linelist to merge log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. Returns ------- llist : array(float) The merged linelist """ # merge any repeated lines in together. llist1 = self._merge_linelist_duplicates(llist1, by_ion_drv=by_ion_drv) llist2 = self._merge_linelist_duplicates(llist2, by_ion_drv=by_ion_drv) # weight the emissivities and combine into one list llist = numpy.append(llist1, llist2) if log_interp: llist[:len(llist1)]['Epsilon'] = numpy.log(llist[:len(llist1)]['Epsilon'])*f[0] llist[len(llist1):]['Epsilon'] = numpy.log(llist[len(llist1):]['Epsilon'])*f[1] else: llist[:len(llist1)]['Epsilon'] = llist[:len(llist1)]['Epsilon']*f[0] llist[len(llist1):]['Epsilon'] = llist[len(llist1):]['Epsilon']*f[1] # sort the data if by_ion_drv: llist =numpy.sort(llist, order=['Ion','Ion_drv', 'UpperLev','LowerLev']) else: llist =numpy.sort(llist, order=['Ion','UpperLev','LowerLev']) # for denoting which levels to keep keep = numpy.zeros(len(llist), dtype=bool) # find each level where the next is for the same transition if by_ion_drv: j = numpy.where((llist[1:]['Ion']==llist[:-1]['Ion']) &\ (llist[1:]['Ion_drv']==llist[:-1]['Ion_drv']) &\ (llist[1:]['UpperLev']==llist[:-1]['UpperLev']) &\ (llist[1:]['LowerLev']==llist[:-1]['LowerLev']))[0] else: j = numpy.where((llist[1:]['Ion']==llist[:-1]['Ion']) &\ (llist[1:]['UpperLev']==llist[:-1]['UpperLev']) &\ (llist[1:]['LowerLev']==llist[:-1]['LowerLev']))[0] for jj in j: # add the emissivity to the second of the 2 llist['Epsilon'][jj+1] += llist['Epsilon'][jj] # check if we are at the end of the array. If so, make this a good level keep[jj+1]=True # remove all the non-keepers llist = llist[keep] # fix the emissivities if log scaled if log_interp: llist['Epsilon'] = numpy.exp(llist['Epsilon']) return llist def return_linelist(self, Te, teunit='keV', nearest = False,\ specrange=False, specunit='A', elements=False, abundance=False,\ log_interp=True): """ Return the linelist of the element Parameters ---------- Te : float Electron temperature (default, keV) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Angstrom','keV'} Units for specrange (default A) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- linelist : numpy.array(dtype=linelist_cie_spectrum) The list of lines at temperature Te. """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest) # check the params: if elements is False: elements=range(1,const.MAXZ_CIE+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 linelist = numpy.zeros(0, dtype = apec.generate_datatypes('linelist_cie_spectrum')) for Z in elements: abund = abundance[Z] if abund > 0: if len(ikT) > 1: llist1 = self.spectra[ikT[0]][Z].return_linelist(specrange,\ specunit=specunit) llist2 = self.spectra[ikT[1]][Z].return_linelist(specrange,\ specunit=specunit) elemlinelist = self._merge_linelists_temperatures(f, llist1, llist2, log_interp) else: elemlinelist = self.spectra[ikT[0]][Z].return_linelist(specrange,\ specunit=specunit) # fix abundance elemlinelist['Epsilon'] *= abund # append this element's line list onto the total line list if len(linelist)==0: linelist=elemlinelist else: linelist = numpy.append(linelist, elemlinelist) return linelist class _CIESpectrum_RS(_CIESpectrum): """ A class holding the emissivity data for CIE emission, and returning spectra Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) Attributes ---------- SessionType : string "CIE" spectra : dict of _ElementSpectrum a dictionary containing the emissivity data for each HDU, subdivided by element (spectra[12][18] is an _ElementSpectrum object containing the argon data for the 12th HDU) kTlist : array The temperatures for each emissivity HDU, in keV logkTlist : array log of kTlist """ def __init__(self, linedata, cocodata): """ Initializes the code. Populates the line and emissivity data in all temperature HDUs. Parameters ---------- linefile : HDUList The line emissivity data cocofile : HDUList The continuum emissivity data """ self.datacache = {} self.SessionType = 'CIE' picklefname = os.path.expandvars('$ATOMDB/spectra_RS_%s_%s.pkl'%\ (linedata[0].header['CHECKSUM'],\ cocodata[0].header['CHECKSUM'])) self.spectra={} self.kTlist = numpy.array(linedata[1].data['kT'].data) self.spectra['kTlist']=numpy.array(linedata[1].data['kT'].data) for ihdu in range(len(self.kTlist)): self.spectra[ihdu]={} self.spectra[ihdu]['kT'] = self.kTlist[ihdu] ldat = numpy.array(linedata[ihdu+2].data.data) cdat = cocodata[ihdu+2].data Zarr = numpy.zeros([len(ldat), const.MAXZ_CIE+1], dtype=bool) Zarr[numpy.arange(len(ldat), dtype=int), ldat['Element']]=True # for Z in range(1,const.MAXZ_CIE+1): # ccdat = cdat[(cdat['Z']==Z) & (cdat['rmJ']==0)] # if len(ccdat)==1: # c = ccdat[0] # else: # c = False # self.spectra[ihdu][Z]=_ElementSpectrum_RS(ldat[Zarr[:,Z]],\ #c, \ #Z) for Z in range(1,const.MAXZ_NEI+1): if not Z in self.spectra[ihdu].keys(): self.spectra[ihdu][Z] = {} icdat = numpy.where((cdat['Z']==Z) & (cdat['rmJ']==0))[0] if len(icdat)==0: ccdat = [False] else: ccdat=[cdat[icdat[0]]] # this format is very tempremental and space inefficient. Make into a numpy array ccdat = numpy.zeros(1, dtype = apec.generate_datatypes('continuum', \ npseudo=cdat[icdat[0]]['N_Pseudo'], \ ncontinuum=cdat[icdat[0]]['N_Cont'])) ccdat['Z'] = cdat[icdat[0]]['Z'] ccdat['rmJ'] = cdat[icdat[0]]['rmJ'] ccdat['N_Cont'] = cdat[icdat[0]]['N_Cont'] ccdat['N_Pseudo'] = cdat[icdat[0]]['N_Pseudo'] ccdat['E_Pseudo'] = cdat[icdat[0]]['E_Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['Pseudo'] = cdat[icdat[0]]['Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['E_Cont'] = cdat[icdat[0]]['E_Cont'][:ccdat['N_Cont'][0]] ccdat['Continuum'] = cdat[icdat[0]]['Continuum'][:ccdat['N_Cont'][0]] self.spectra[ihdu][Z]=_ElementSpectrum_RS(ldat[Zarr[:,Z]],\ ccdat[0], Z) pickle.dump(self.spectra, open(picklefname,'wb')) self.logkTlist=numpy.log(self.kTlist) def return_spectrum(self, Te, N_e, Ab, teunit='keV', nearest = False,\ elements=False, abundance=False, log_interp=True,\ broaden_object=False,\ dolines=True, docont=True, dopseudo=True, \ do_eebrems = False): """ Return the spectrum of the element on the energy bins in self.session.specbins Parameters ---------- Te : float Electron temperature (default, keV) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) do_eebrems : bool Calculate electron-electron bremsstrahlung emission (default False) Returns ------- spec : array(float) The element's emissivity spectrum, in photons cm^3 s^-1 bin^-1 """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: if elements is False: elements=range(1,const.MAXZ_CIE+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 s = 0.0 # electron-electron bremsstrahlung electron counter if do_eebrems: nel=0.0 rawabund = atomdb.get_abundance(datacache=self.datacache) for Z in elements: abund = abundance[Z] if abund > 0: epslimit = self.broaden_limit/abund # go caclulate the spectrum, with broadening as assigned. ss=0.0 if len(ikT) == 1: ss = self.spectra[ikT[0]][Z].return_spectrum(self.ebins,\ kT, N_e, Ab, \ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund else: ss1 = self.spectra[ikT[0]][Z].return_spectrum(self.ebins,\ kT, N_e, Ab, \ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund ss2 = self.spectra[ikT[1]][Z].return_spectrum(self.ebins,\ kT, N_e, Ab,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=dolines,\ docont=docont,\ dopseudo=dopseudo) *\ abund ss = self._merge_spectra_temperatures(f,ss1,ss2,log_interp) s+=ss if do_eebrems: ionpop=apec.return_ionbal(Z, kT, datacache=self.datacache, teunit='keV') Zabundance = rawabund[Z]*abund tmp = sum(ionpop*numpy.arange(Z+1))*Zabundance nel +=tmp if do_eebrems: eespec = calc_ee_brems_spec(self.ebins, kT, nel) s+= eespec return s class _ElementSpectrum(): """ A class holding the emissivity data for an element in one HDU Parameters ---------- linedata : array(linedatatype) array of line wavelengths and emissivities, from AtomDB files. Should already be filtered to only be from one element. cocodata : array(cocodatatype) array of continuum wavelengths and emissivities, from AtomDB files Should already be filtered to only be from one element. Z : int The atomic number of the element z1_drv : int The charge + 1 for the ion. 0 = whole element. Attributes ---------- lines : _LineData A _LineData object containing all the line information continuum : _ContinuumData A _ContinuumData object containing all the contrinuum information """ def __init__(self, linedata, cocodata, Z, z1_drv=0): """ Initialization Parameters ---------- linedata : hdu the line data passed in cocodata : hdu the continuum data passed in Z : int the atomic number of the element z1_drv : int the ion charge of the driving ion (0 for whole element) Modifies -------- self.lines : _LineData The line emission HDU data for this element/ion self.continuum : _ContinuumData The continuum emission HDU data for this element/ion """ # intialize if z1_drv != 0: tmp = linedata[(linedata['Element'] == Z) &\ (linedata['Ion_drv'] == z1_drv)] self.lines = _LineData(tmp) self.continuum = _ContinuumData(cocodata) else: self.lines = _LineData(linedata) self.continuum = _ContinuumData(cocodata) def return_spectrum(self, eedges, Te, ebins_checksum=False,\ thermal_broadening=False,\ broaden_limit=False,\ velocity_broadening=0.0,\ teunit = 'keV',\ broaden_object=False,\ dolines = True,\ docont = True,\ dopseudo = True): """ Calculate the spectrum Parameters ---------- eedges : array bin edges (keV) Te : float temperature (keV by defult) ebins_checksum : string the md5 checksum of eedges thermal_broadening : bool true to apply thermal broadening broaden_limit : float only broaden lines stronger than this. velocity_broadening : float velocity broadening to apply, km/s. Set <=0 for none (default) teunit : string Temperature unit (K, keV, eV) broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) Returns ------- spectrum : array(float) The spectrum in ph cm^3 s^-1 bin^-1 """ T = util.convert_temp(Te, teunit, 'K') if ebins_checksum == False: # check the parent # if self.parentElementSpectrum != False: # ebins_checksum = self.parentElementSpectrum.ebins_checksum # check again, in case there was no parent # if ebins_checksum == False: # generate the checksum ebins_checksum = hashlib.md5(eedges).hexdigest() self.ebins_checksum = ebins_checksum self.T = T spec = numpy.zeros(len(eedges)-1) if dolines: spec+=self.lines.return_spec(eedges, T, ebins_checksum=ebins_checksum,\ thermal_broadening=thermal_broadening,\ broaden_limit=broaden_limit,\ velocity_broadening=velocity_broadening,\ broaden_object=broaden_object) if dopseudo+docont > 0: spec+=self.continuum.return_spec(eedges, ebins_checksum = ebins_checksum,\ dopseudo=dopseudo, docont=docont) self.spectrum = spec return self.spectrum def return_linelist(self, specrange, specunit='A'): """ Return the list of lines in specrange Parameters ---------- specrange : array spectral range to look for lines in specunit : string units of spectral range (A or keV) Returns ------- linelist : array list of lines and epsilons """ wave = util.convert_spec(specrange, specunit, 'A') llist = self.lines.lines[(self.lines.lines['Lambda']>=wave[0]) &\ (self.lines.lines['Lambda']<=wave[1])] return llist def return_linematch(self, Z, z1, up, lo, z1_drv=0): """ Return the line(s) which match the transition Parameters ---------- Z : int nuclear charge z1 : int ion charge + 1 up : int upper level of transition lo : int lower level of transition z1_drv : int if provided, also filter on driving ion charge (NEI only) Returns ------- linelist : array list of lines and epsilons """ llist = self.lines.lines[(self.lines.lines['Element']==Z) &\ (self.lines.lines['Ion']==z1) &\ (self.lines.lines['UpperLev']==up) &\ (self.lines.lines['LowerLev']==lo)] if z1_drv != 0: llist = llist[llist['Ion_drv']==z1_drv] return llist # def apply_response(self): # """ # Apply a response to a spectrum # Parameters # ---------- # spectrum : array(float) # The spectrum, in counts/bin/second, to have the response applied to. Must be # binned on the same grid as the rmf. # rmf : string or pyfits.hdu.hdulist.HDUList # The filename of the rmf or the opened rmf file # arf : string or pyfits.hdu.hdulist.HDUList # The filename of the arf or the opened arf file # Returns # ------- # array(float) # energy grid (keV) for returned spectrum # array(float) # spectrum folded through the response # """ # arfdat = self.session.arf # if arfdat: # #arfdat = arf # res = self.spectrum * arfdat['SPECRESP'].data['SPECRESP'] # else: # res = self.spectrum*1.0 # ret = numpy.matmul(res,self.session.rmfmatrix) # self.spectrum_withresp=ret class _ElementSpectrum_RS(_ElementSpectrum): """ A class holding the emissivity data for an element in one HDU Parameters ---------- linedata : array(linedatatype) array of line wavelengths and emissivities, from AtomDB files. Should already be filtered to only be from one element. cocodata : array(cocodatatype) array of continuum wavelengths and emissivities, from AtomDB files Should already be filtered to only be from one element. Z : int The atomic number of the element z1_drv : int The charge + 1 for the ion. 0 = whole element. Attributes ---------- lines : _LineData A _LineData object containing all the line information continuum : _ContinuumData A _ContinuumData object containing all the contrinuum information """ def __init__(self, linedata, cocodata, Z, z1_drv=0): """ Initialization Parameters ---------- linedata : hdu the line data passed in cocodata : hdu the continuum data passed in Z : int the atomic number of the element z1_drv : int the ion charge of the driving ion (0 for whole element) Modifies -------- self.lines : _LineData The line emission HDU data for this element/ion self.continuum : _ContinuumData The continuum emission HDU data for this element/ion """ # intialize if z1_drv != 0: tmp = linedata[(linedata['Element'] == Z) &\ (linedata['Ion_drv'] == z1_drv)] self.lines = _LineData_RS(tmp) self.continuum = _ContinuumData(cocodata) else: self.lines = _LineData_RS(linedata) self.continuum = _ContinuumData(cocodata) def return_spectrum(self, eedges, Te, N_e, Ab, ebins_checksum=False,\ thermal_broadening=False,\ broaden_limit=False,\ velocity_broadening=0.0,\ teunit = 'keV',\ broaden_object=False,\ dolines = True,\ docont = True,\ dopseudo = True): """ Calculate the spectrum Parameters ---------- eedges : array bin edges (keV) Te : float temperature (keV by defult) ebins_checksum : string the md5 checksum of eedges thermal_broadening : bool true to apply thermal broadening broaden_limit : float only broaden lines stronger than this. velocity_broadening : float velocity broadening to apply, km/s. Set <=0 for none (default) teunit : string Temperature unit (K, keV, eV) broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) Returns ------- spectrum : array(float) The spectrum in ph cm^3 s^-1 bin^-1 """ T = util.convert_temp(Te, teunit, 'K') if ebins_checksum == False: # check the parent # if self.parentElementSpectrum != False: # ebins_checksum = self.parentElementSpectrum.ebins_checksum # check again, in case there was no parent # if ebins_checksum == False: # generate the checksum ebins_checksum = hashlib.md5(eedges).hexdigest() self.ebins_checksum = ebins_checksum self.T = T spec = numpy.zeros(len(eedges)-1) if dolines: spec+=self.lines.return_spec(eedges, T, N_e, Ab, ebins_checksum=ebins_checksum,\ thermal_broadening=thermal_broadening,\ broaden_limit=broaden_limit,\ velocity_broadening=velocity_broadening,\ broaden_object=broaden_object) if dopseudo+docont > 0: spec+=self.continuum.return_spec(eedges, ebins_checksum = ebins_checksum,\ dopseudo=dopseudo, docont=docont) self.spectrum = spec return self.spectrum def return_linelist(self, specrange, specunit='A'): """ Return the list of lines in specrange Parameters ---------- specrange : array spectral range to look for lines in specunit : string units of spectral range (A or keV) Returns ------- linelist : array list of lines and epsilons """ wave = util.convert_spec(specrange, specunit, 'A') llist = self.lines.lines[(self.lines.lines['Lambda']>=wave[0]) &\ (self.lines.lines['Lambda']<=wave[1])] return llist def return_linematch(self, Z, z1, up, lo, z1_drv=0): """ Return the line(s) which match the transition Parameters ---------- Z : int nuclear charge z1 : int ion charge + 1 up : int upper level of transition lo : int lower level of transition z1_drv : int if provided, also filter on driving ion charge (NEI only) Returns ------- linelist : array list of lines and epsilons """ llist = self.lines.lines[(self.lines.lines['Element']==Z) &\ (self.lines.lines['Ion']==z1) &\ (self.lines.lines['UpperLev']==up) &\ (self.lines.lines['LowerLev']==lo)] if z1_drv != 0: llist = llist[llist['Ion_drv']==z1_drv] return llist # def apply_response(self): # """ # Apply a response to a spectrum # Parameters # ---------- # spectrum : array(float) # The spectrum, in counts/bin/second, to have the response applied to. Must be # binned on the same grid as the rmf. # rmf : string or pyfits.hdu.hdulist.HDUList # The filename of the rmf or the opened rmf file # arf : string or pyfits.hdu.hdulist.HDUList # The filename of the arf or the opened arf file # Returns # ------- # array(float) # energy grid (keV) for returned spectrum # array(float) # spectrum folded through the response # """ # arfdat = self.session.arf # if arfdat: # #arfdat = arf # res = self.spectrum * arfdat['SPECRESP'].data['SPECRESP'] # else: # res = self.spectrum*1.0 # ret = numpy.matmul(res,self.session.rmfmatrix) # self.spectrum_withresp=ret class _LineData(): """ A class holding the line data for an element in one HDU Parameters ---------- linelist : array(linedatatype) array of line wavelengths and emissivities, from AtomDB files. Attributes ---------- lines : array(linedatatype) List of lines, wavelength and emissivities lineenergies : array(float) list of line energies spectrum_calculated : bool True if spectrum has already been calculated, otherwise false T: float Temperature (K) last spectrum was calculated at (for broadening) v : float Velocity (km/s) last spectrum was calculated at (for broadening) ebins_checksum : string md5sum of the ebins the spectrum was last calculated on. Used to identify if new calculations are required or can just return the previous value. """ def __init__(self, linelist): """ Initialization Parameters ---------- linelist : numpy array list of lines from AtomDB fits files """ self.lines = linelist self.lineenergies = const.HC_IN_KEV_A/self.lines['Lambda'] self.spectrum_calculated = False self.T = 0.0 self.v = 0.0 self.ebins_checksum = False def return_spec(self, eedges, T, ebins_checksum = False,\ thermal_broadening = False, \ velocity_broadening = 0.0, \ broaden_limit = 1e-20,\ broaden_object=False): """ return the line emission spectrum at tempterature T Parameters ---------- eedges : array energy bin edges, keV T : float temperature in Kelvin ebins_checksum : string the md5 checksum of eedges thermal_broadening : bool true to apply thermal broadening velocity_broadening : float velocity broadening to apply, km/s. Set <=0 for none (default) broaden_limit : float only broaden lines stronger than this. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. Returns ------- spectrum : array(float) Emissivity on eedges spectral bins of the lines, in ph cm^3 s^-1 bin^-1 """ if ebins_checksum == False: # generate the checksum ebins_checksum = hashlib.md5(eedges).hexdigest() if velocity_broadening==None: velocity_broadening=0.0 if ((thermal_broadening == False) & \ (velocity_broadening == False)): if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True)): # no need to do anything. pass else: spec,z = numpy.histogram(self.lineenergies, \ bins=eedges, \ weights = self.lines['Epsilon']) self.spectrum = spec self.spectrum_calculated = True else: #if thermal_broadening == True if ((thermal_broadening == True) &\ (velocity_broadening <= 0)): # cehck to see if we need ot redo this if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v <=0.0)): pass else: # recalculate! recalc = True if ((thermal_broadening == False) &\ (velocity_broadening > 0)): # cehck to see if we need ot redo this if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v == velocity_broadening)): pass else: # recalculate! recalc = True if ((thermal_broadening == True) &\ (velocity_broadening > 0)): if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v == velocity_broadening)): pass else: # recalculate! recalc = True if recalc==True: # ind = strong line indicies # nonind = weak line indicies ind = self.lines['Epsilon']>broaden_limit nonind = ~ind # calculate the widths of the strong lines llist = self.lines[ind] # get a raw dictionary of masses in amu masslist = atomic.Z_to_mass(1,raw=True) if thermal_broadening==False: T=0.0 Tb = 0.0 else: Tb = util.convert_temp(T, 'K','keV')*const.ERG_KEV/(masslist[llist['Element']]*1e3*const.AMUKG) if velocity_broadening <0: velocity_broadening = 0.0 vb=0.0 else: vb = (velocity_broadening * 1e5)**2 wcoeff = numpy.sqrt(Tb+vb) / (const.LIGHTSPEED*1e2) elines = self.lineenergies[ind] width = wcoeff*elines # Filter out lines more than NSIGMALIMIT sigma outside the range NSIGMALIMIT=4 eplu = elines+NSIGMALIMIT*width eneg = elines-NSIGMALIMIT*width emax = max(eedges) emin = min(eedges) # identify all the good lines! igood = numpy.where(((elines >= emin) & (eneg < emax)) |\ ((elines < emin) & (eplu < emin)))[0] spec = numpy.zeros(len(eedges)) # t0 = time.time() for iline in igood: spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ width[iline],eedges)*llist['Epsilon'][iline] # t1 = time.time() # print("Broadeninging %i lines, in %f seconds"%(len(igood), t1-t0)) spec = spec[1:]-spec[:-1] # Then add on the weak lines s,z = numpy.histogram(self.lineenergies[nonind], \ bins = eedges,\ weights = self.lines['Epsilon'][nonind]) spec+=s self.spectrum = spec self.spectrum_calculated = True if thermal_broadening: self.T = T else: self.T=0.0 self.v = velocity_broadening return self.spectrum class _LineData_RS(): """ A class holding the line data for an element in one HDU Function of CIESession as it holds set_abund Parameters ---------- linelist : array(linedatatype) array of line wavelengths and emissivities, from AtomDB files. Attributes ---------- lines : array(linedatatype) List of lines, wavelength and emissivities lineenergies : array(float) list of line energies spectrum_calculated : bool True if spectrum has already been calculated, otherwise false T: float Temperature (K) last spectrum was calculated at (for broadening) v : float Velocity (km/s) last spectrum was calculated at (for broadening) ebins_checksum : string md5sum of the ebins the spectrum was last calculated on. Used to identify if new calculations are required or can just return the previous value. """ def __init__(self, linelist): """ Initialization Parameters ---------- linelist : numpy array list of lines from AtomDB fits files """ self.lines = linelist self.line_stock = linelist #self.abund = CIESession.abund self.lineenergies = const.HC_IN_KEV_A/self.lines['Lambda'] self.elements=self.lines['Element'] self.osc = self.lines['Oscil_str'] self.spectrum_calculated = False self.T = 0.0 self.v = 0.0 self.ebins_checksum = False #file_osc = "$ATOMDB/osc.fits" #f_osc = os.path.expandvars(file_osc) #atomdb_path = os.path.expandvars("$ATOMDB") #if not os.path.exists(f_osc): # wget.download('https://hea-www.cfa.harvard.edu/AtomDB/releases/osc.fits', out=atomdb_path) #print("File osc.fits downloaded successfully.") #else: #print("File osc.fits exists.") def integrand(self, x, number): return 1.2*numpy.exp(-x*x-(number*numpy.exp(-x*x))) def return_spec(self, eedges, T, N_e, Ab, ebins_checksum = False,\ thermal_broadening = True, \ velocity_broadening = 0.0, \ broaden_limit = 1e-20,\ broaden_object=False): """ return the line emission spectrum at tempterature T Parameters ---------- eedges : array energy bin edges, keV T : float temperature in Kelvin ebins_checksum : string the md5 checksum of eedges thermal_broadening : bool true to apply thermal broadening velocity_broadening : float velocity broadening to apply, km/s. Set <=0 for none (default) broaden_limit : float only broaden lines stronger than this. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. Returns ------- spectrum : array(float) Emissivity on eedges spectral bins of the lines, in ph cm^3 s^-1 bin^-1 """ settings=apec.parse_par_file(os.path.expandvars('$ATOMDB/apec.par')) #Abund= atomdb.get_abundance(abundfile=False, \ # abundset='Lodd09', element=[-1], datacache=False, settings=False, show=False) Abund = [1.0, 0.09772372209558111, 1.4454397707459272e-11, 1.4125375446227541e-11, 3.9810717055349735e-10, 0.0003630780547701018, 0.00011220184543019652, 0.0008511380382023759, 3.63078054770101e-08, 0.0001230268770812381, 2.1379620895022326e-06, 3.8018939632056124e-05, 2.9512092266663837e-06, 3.5481338923357534e-05, 2.818382931264455e-07, 1.62181009735893e-05, 3.162277660168379e-07, 3.63078054770101e-06, 1.3182567385564074e-07, 2.2908676527677747e-06, 1.2589254117941675e-09, 9.772372209558112e-08, 1e-08, 4.677351412871981e-07, 2.4547089156850283e-07, 4.6773514128719816e-05, 8.317637711026709e-08, 1.778279410038923e-06, 1.62181009735893e-08, 3.981071705534969e-08] #Abund = [1.0, 0.08413951416451965, 1.2589254117941675e-11, 2.39883291901949e-11, 5.011872336272725e-10, 0.00024547089156850334, 7.244359600749905e-05, 0.0005370317963702533, 3.63078054770101e-08, 0.00011220184543019652, 1.9952623149688787e-06, 3.467368504525317e-05, 2.9512092266663837e-06, 3.3113112148259076e-05, 2.8840315031266057e-07, 1.3803842646028839e-05, 3.162277660168379e-07, 3.1622776601683796e-06, 1.3182567385564074e-07, 2.1379620895022326e-06, 1.2589254117941675e-09, 7.94328234724282e-08, 1e-08, 4.365158322401656e-07, 2.3442288153199225e-07, 2.818382931264455e-05, 8.317637711026709e-08, 1.698243652461746e-06, 1.62181009735893e-08, 4.1686938347033556e-08] self.lines['Epsilon']=self.lines['Epsilon'] #print(self.osc) # ''' # #print(self.elements) # if len(self.elements) != 0: # # read the ionization balance table # ionfrac = atomdb._get_precalc_ionfrac(os.path.expandvars(settings['IonBalanceTable']),self.elements[0] , T) # #else: # #ionfrac=apec._calc_elem_ionbal(self.lines['Element'][1], T) # # calculate the ionization balance # #ionftmp = apec.calc_full_ionbal(T, 1e14, Te_init=T, Zlist=[self.elements[1]], extrap=True) # #ionfrac = ionftmp[self.elements[1]] # # # # energy_priyankev = const.HC_IN_KEV_A/self.lines['Lambda'] # # ion=ionfrac[self.lines['Ion']-1] # #print(energy_kev, self.lines['Lambda'],ion) # # u_th_sq= 2*const.BOLTZMAN_CONSTANT*T/(2*self.lines['Element'][0]*(const.PROTON_MASS)*((100*(const.LIGHTSPEED))**2)) # # # u_turb_sq = 2* (((1000*velocity_broadening)/const.LIGHTSPEED)**2) # # # delta_E = 1.60218e-9*energy_kev*math.sqrt(u_th_sq + u_turb_sq ) # # # # tau = 1.15e23*N_e*0.83*Abund[self.lines['Element'][0]-1]*ion*(math.pi**0.5)*(2**0.5)*(const.PLANCK_CONSTANT)*(const.CLASSICAL_ELECTRON_RADIUS)*(const.LIGHTSPEED*100)*self.lines['Oscil_str']/delta_E # # # # # self.lines['Epsilon'] = self.line_stock['Epsilon'] # # for taus in range(len(tau)): # if tau[taus] > 0.5 and tau[taus]<1: # #print(self.lines['Element'][0], self.lines['Lambda'][taus],tau[taus]) # factor = (1-(math.exp(-tau[taus]))) # self.lines['Epsilon'][taus] = self.lines['Epsilon'][taus] * (1-factor) # # # if tau[taus] >= 1: # print(tau[taus], self.lines['Lambda'][taus], self.lines['Element'][taus]) # factor = (1-(math.exp(-tau[taus])))/(tau[taus]) # self.lines['Epsilon'][taus] = self.lines['Epsilon'][taus] * (factor) # # # # ''' #start = time.time() #print(start) if ebins_checksum == False: # generate the checksum ebins_checksum = hashlib.md5(eedges).hexdigest() if velocity_broadening==None: velocity_broadening=0.0 if ((thermal_broadening == False) & \ (velocity_broadening == False)): if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True)): # no need to do anything. pass else: spec,z = numpy.histogram(self.lineenergies, \ bins=eedges, \ weights = self.lines['Epsilon']) #spec=2*spec #print(self.lines['Epsilon'], self.lines['Epsilon_Err']) self.spectrum = spec self.spectrum_calculated = True else: #if thermal_broadening == True if ((thermal_broadening == True) &\ (velocity_broadening <= 0)): # cehck to see if we need ot redo this if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v <=0.0)): pass else: # recalculate! recalc = True if ((thermal_broadening == False) &\ (velocity_broadening > 0)): # cehck to see if we need ot redo this if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v == velocity_broadening)): pass else: # recalculate! recalc = True if ((thermal_broadening == True) &\ (velocity_broadening > 0)): if ((self.ebins_checksum==ebins_checksum) &\ (self.spectrum_calculated == True) &\ (self.T == T) &\ (self.v == velocity_broadening)): pass else: # recalculate! recalc = True if recalc==True: # ind = strong line indicies # nonind = weak line indicies ind = self.lines['Epsilon']>broaden_limit nonind = ~ind # calculate the widths of the strong lines llist = self.lines[ind] # get a raw dictionary of masses in amu masslist = atomic.Z_to_mass(1,raw=True) if thermal_broadening==False: T=0.0 Tb = 0.0 else: Tb = util.convert_temp(T, 'K','keV')*const.ERG_KEV/(masslist[llist['Element']]*1e3*const.AMUKG) if velocity_broadening <0: velocity_broadening = 0.0 vb=0.0 else: vb = (velocity_broadening * 1e5)**2 wcoeff = numpy.sqrt(Tb+vb) / (const.LIGHTSPEED*1e2) elines = self.lineenergies[ind] width = wcoeff*elines #print(wcoeff, elines) # Filter out lines more than NSIGMALIMIT sigma outside the range NSIGMALIMIT=4 eplu = elines+NSIGMALIMIT*width eneg = elines-NSIGMALIMIT*width emax = max(eedges) emin = min(eedges) # identify all the good lines! igood = numpy.where(((elines >= emin) & (eneg < emax)) |\ ((elines < emin) & (eplu < emin)))[0] spec = numpy.zeros(len(eedges)) # t0 = time.time() #print(igood) if len(llist['Element']) != 0: # read the ionization balance table ionfrac = atomdb._get_precalc_ionfrac(os.path.expandvars(settings['IonBalanceTable']),llist['Element'][0] , T) #print(ionfrac) #print(llist['Element']) #ionfrac = atomdb._get_precalc_ionfrac(os.path.expandvars(settings['IonBalanceTable']),llist['Element'] , T) #print(ionfrac) energy_kev = const.HC_IN_KEV_A/llist['Lambda'] #print(energy_kev) #print(energy_kev) ion=ionfrac[llist['Ion']-1]#does this make sense? u_th_sq= 2*const.BOLTZMAN_CONSTANT*T/(2*llist['Element'][0]*(const.PROTON_MASS)*((100*(const.LIGHTSPEED))**2)) u_turb_sq = 2* (((1000*velocity_broadening)/const.LIGHTSPEED)**2) delta_E = 1.60218e-9*energy_kev*math.sqrt(u_th_sq + u_turb_sq ) delta_E_kev= energy_kev*math.sqrt(u_th_sq + u_turb_sq ) if numpy.array(Ab).size==30: # tau = N_e*0.83*Abund[llist['Element'][0]-1]*ion*(math.pi**0.5)*(2**0.5)*Ab[llist['Element'][0]-1]*(const.PLANCK_CONSTANT)*(const.CLASSICAL_ELECTRON_RADIUS)*(const.LIGHTSPEED*100)*llist['Oscil_str']/delta_E tau = N_e*0.83*Abund[llist['Element'][0]-1]*ion*(math.pi**0.5)*Ab[llist['Element'][0]-1]*(const.PLANCK_CONSTANT)*(const.CLASSICAL_ELECTRON_RADIUS)*(const.LIGHTSPEED*100)*llist['Oscil_str']/delta_E elif numpy.array(Ab).size ==14: elem=[1,2,6,7,8,10,12,13,14,16,18,20,26,28] abundan = numpy.ones(30) for i in range(len(elem)): abundan[elem[i]-1] = Ab[i] tau = N_e*0.83*Abund[llist['Element'][0]-1]*ion*(math.pi**0.5)*abundan[llist['Element'][0]-1]*(const.PLANCK_CONSTANT)*(const.CLASSICAL_ELECTRON_RADIUS)*(const.LIGHTSPEED*100)*llist['Oscil_str']/delta_E elif Ab.size==1: tau = N_e*0.83*Abund[llist['Element'][0]-1]*ion*(math.pi**0.5)*Ab*(const.PLANCK_CONSTANT)*(const.CLASSICAL_ELECTRON_RADIUS)*(const.LIGHTSPEED*100)*llist['Oscil_str']/delta_E #print(math.sqrt(u_th_sq + u_turb_sq ),energy_kev) #print(llist[igood]) #zzz=input() #def integrand(x,number): # return 1.2*exp(-x*x-(number*exp(-x*x))) #def integ(number): # return quad(integrand, 0, numpy.inf, args=(number))[0] #file = open('test.dat','w+') for iline in igood: #spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ #width[iline],eedges)*llist['Epsilon'][iline] if tau[iline] < 0.1: spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ width[iline],eedges)*llist['Epsilon'][iline] #print(tau[iline], llist['Lambda'][iline], Ab[llist['Element'][0]-1], N_e, Abund[llist['Element'][0]-1]) #elif tau[iline] >= 0.5 and tau[iline]<10: elif tau[iline] > 0.1 and tau[iline]<10: factor = (0.00001*(tau[iline]**4))-(0.0008**(tau[iline]**3))+(0.0209**(tau[iline]**2))-(0.2254*tau[iline])+0.9828 #factor = ((1-(math.exp(-tau[iline])))/(tau[iline])) factor1 = 1-factor #print(tau[iline], const.HC_IN_KEV_A/llist['Lambda'][iline], llist['Element'][iline],llist['Ion'][iline]) #print(tau[iline], const.HC_IN_KEV_A/llist['Lambda'][iline], factor) #print(tau[iline], llist['Lambda'][iline], factor) spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ width[iline],eedges)*llist['Epsilon'][iline]-(broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ delta_E_kev[iline],eedges)*llist['Epsilon'][iline] *factor1) elif tau[iline]>=10 and tau[iline]<23: factor =(-0.000004*(tau[iline]**3))+(0.0005*(tau[iline]**2))-(0.0189*tau[iline])+0.2468 #factor = ((1-(math.exp(-tau[iline])))/(tau[iline])) factor1 = 1-factor #print(tau[iline], llist['Lambda'][iline], llist['Element'][iline],llist['Ion'][iline]) #print(tau[iline], const.HC_IN_KEV_A/llist['Lambda'][iline], factor) #print(tau[iline], llist['Lambda'][iline], factor) #spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ # width[iline],eedges)*llist['Epsilon'][iline]-(broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ # delta_E_kev[iline],eedges)*llist['Epsilon'][iline] *factor1*(width[iline]/delta_E_kev[iline])) spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ width[iline],eedges)*llist['Epsilon'][iline]-(broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ delta_E_kev[iline],eedges)*llist['Epsilon'][iline] *factor1) else: factor=0.01 factor1 = 1-factor spec += broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ width[iline],eedges)*llist['Epsilon'][iline]-(broaden_object.broaden(const.HC_IN_KEV_A/llist['Lambda'][iline],\ delta_E_kev[iline],eedges)*llist['Epsilon'][iline] *factor1) # t1 = time.time() # print("Broadeninging %i lines, in %f seconds"%(len(igood), t1-t0)) spec = spec[1:]-spec[:-1] #print(spec) # Then add on the weak lines s,z = numpy.histogram(self.lineenergies[nonind], \ bins = eedges,\ weights = self.lines['Epsilon'][nonind]) spec+=s self.spectrum = spec self.spectrum_calculated = True if thermal_broadening: self.T = T else: self.T=0.0 self.v = velocity_broadening return self.spectrum class _ContinuumData(): """ A class holding the continuum data for an element in one HDU Parameters ---------- cocoentry : array(cocodatatype) A single row from the continuum data in an AtomDB file. Attributes ---------- ECont : array(float) The continuum energies (keV) EPseudo : array(float) The pseudocontinuum energies (keV) Cont : array(float) The continuum emissivities (ph cm^3 s^-1 keV^-1) Pseudo : array(float) The pseudocontinuum energies (ph cm^3 s^-1 keV^-1) spectrum_calculated : bool True if spectrum has already been calculated, otherwise false ebins_checksum : string md5sum of the ebins the spectrum was last calculated on. Used to identify if new calculations are required or can just return the previous value. docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) """ def __init__(self, cocoentry): """ Initialization Parameters ---------- cocoentry : numrec The data for 1 element/ion """ if type(cocoentry)==bool: if cocoentry == False: cocoentry={} cocoentry['N_Cont']=2 cocoentry['N_Pseudo']=2 cocoentry['E_Cont'] = numpy.array([0.0001,10000]) cocoentry['Continuum'] = numpy.array([0.0,0.0]) cocoentry['E_Pseudo'] = numpy.array([0.0001,10000]) cocoentry['Pseudo'] = numpy.array([0.0,0.0]) nEC = cocoentry['N_Cont'] nEP = cocoentry['N_Pseudo'] self.ECont = cocoentry['E_Cont'][:nEC] self.EPseudo = cocoentry['E_Pseudo'][:nEP] self.Cont = cocoentry['Continuum'][:nEC] self.Pseudo = cocoentry['Pseudo'][:nEP] self.spectrum_calculated = False self.ebins_checksum = False def return_spec(self, eedges, ebins_checksum = False, docont=True, dopseudo=True): # import scipy.integrate # get the checksum for the ebins, if not provided if ebins_checksum == False: # generate the checksum ebins_checksum = hashlib.md5(eedges).hexdigest() # else: if docont: cont = _expand_E_grid(eedges, len(self.ECont), self.ECont, self.Cont) else: cont = 0.0 if dopseudo: pseudo = _expand_E_grid(eedges, len(self.EPseudo), self.EPseudo, self.Pseudo) else: pseudo = 0.0 self.ebins_checksum = ebins_checksum self.spectrum = cont+pseudo return self.spectrum # def apply_response(spectrum, rmf, arf=False): # """ # Apply a response to a spectrum # Parameters # ---------- # spectrum : array(float) # The spectrum, in counts/bin/second, to have the response applied to. Must be # binned on the same grid as the rmf. # rmf : string or pyfits.hdu.hdulist.HDUList # The filename of the rmf or the opened rmf file # arf : string or pyfits.hdu.hdulist.HDUList # The filename of the arf or the opened arf file # Returns # ------- # array(float) # energy grid (keV) for returned spectrum # array(float) # spectrum folded through the response # """ # # # # Update 2016-05-25 # # # # Changed to return the energy grid and the spectrum, as apparently in some # # instruments these are not the same as the input energy grid. # if arf: # if type(arf)==str: # arfdat = pyfits.open(arf) # elif type(arf) == pyfits.hdu.hdulist.HDUList: # arfdat = arf # else: # print("ERROR: unknown arf type, %s"%(repr(type(arf)))) # return # res = spectrum * arfdat['SPECRESP'].data['SPECRESP'] # else: # res = spectrum*1.0 # sret = numpy.matmul(res,matrix) # return ebins, ret, sret
[docs] class NEISession(CIESession): """ Load and generate a Non-equilibrium ionization (NEI) spectrum Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Default AG89. Attributes ---------- datacache : dict Any Atomdb FITS files which have to be opened are stored here spectra : NEISpectra Object storing the actual spectral data elements : iterable of int Elements to include, listed by atomic number. if not set, include all. default_abundset : string The abundance set used for the original emissivity file calculation abundset : string The abundance set to be used for the returned spectrum abundsetvector : array_like(float) The relative abundance between default_abundset and abundset for each element response_set : bool Have we loaded a response (or set a dummy response) dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) broaden_limit : float Apply broadening to lines with epsilon > this value (ph cm3 s-1) thermal_broadening : bool Apply thermal broadening to lines (default = False) velocity_broadening : float Apply velocity broadening with this velocity (km/s). If <=0, do not apply. Examples -------- Create a session instance: >>> s = NEISession() Set up the responses, in this case a dummy response from 0.1 to 10 keV >>> ebins = numpy.linspace(0.1,10,1000) >>> s.set_response(ebins, raw=True) Turn on thermal broadening >>> s.set_broadening(True) Will thermally broaden lines with emissivity > 1.000000e-20 ph cm3 s-1 Return spectrum at 1.0keV, Ne *t = 1e11 cm^-3 s >>> spec = s.return_spectrum(1.0, 1e11) spec is in photons cm^3 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) """ def __init__(self, linefile="$ATOMDB/apec_nei_line.fits",\ cocofile="$ATOMDB/apec_nei_comp.fits",\ elements=False, abundset='AG89'): """ Initialization routine. Can set the line and continuum files here Input ----- linefile : str or HDUList The filename of the line emissivity data, or the opened file. cocofile : str or HDUList The filename of the continuum emissivity data, or the opened file. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. """ self.SessionType='NEI' self._session_initialise1(linefile, cocofile, elements, abundset) self.spectra=_NEISpectrum(self.linedata, self.cocodata) self._session_initialise2()
[docs] def return_linelist(self,Te, tau, specrange, init_pop='ionizing',specunit='A', \ teunit='keV', apply_aeff=False, by_ion_drv = False,\ nearest=False, apply_binwidth=False,\ log_interp=True, freeze_ion_pop=False): """ Get the list of line emissivities vs wavelengths Parameters ---------- Te : float Temperature in keV or K tau : float ionization timescale, ne * t (cm^-3 s). specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Angstrom','keV'} Units for specrange teunit : {'keV' , 'K'} Units of te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the lines in the linelist to modify their intensities. by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. nearest : bool calculate spectrum at nearest temperature in linelist, no interpolation. Ionization fraction calculation still exact. apply_binwidth : log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. Returns ------- linelist : array(linelist) The list of lines with lambda (A), energy (keV), epsilon (ph cm3 s-1),\ epsilon_aeff (ph cm5 s-1) ion (string) and upper & lower levels. """ kT = util.convert_temp(Te, teunit, 'keV') el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] s= self.spectra.return_linelist(kT, tau, init_pop=init_pop, \ specrange=specrange, teunit='keV',\ specunit=specunit, elements=self.elements,\ abundance = ab, by_ion_drv = by_ion_drv,\ log_interp=log_interp, \ freeze_ion_pop = freeze_ion_pop) # do the response thing #resp = s.response() if apply_aeff == True: epsilon_aeff = self._apply_linelist_aeff(s, specunit, apply_binwidth) s['Epsilon'] = epsilon_aeff return(s)
[docs] def return_line_emissivity(self, Telist, taulist, Z, z1, up, lo, \ specunit='A', teunit='keV', \ apply_aeff=False, apply_abund=True,\ log_interp = True, init_pop='ionizing',\ freeze_ion_pop=False): """ Get line emissivity as function of Te, tau. Assumes ionization from neutral. Parameters ---------- Telist : float or array(float) Temperature(s) in keV or K taulist : float or array(float) ionization timescale(s), ne \* t (cm^-3 s). Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the line emissivity in the linelist to modify their intensities. apply_abund : bool If true, apply the abundance set in the session to the result. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. Returns ------- ret : dict Dictionary containing: Te, tau, teunit: as input wavelength : line wavelength (A) energy : line energy (keV) epsilon : emissivity in ph cm^3 s-1 (or ph cm^5 s^-1 if apply_aeff=True) first index is temperature, second is tau. If Te or Tau was supplied as a scalar, then that index is removed """ Tevec, Teisvec = util.make_vec(Telist) tauvec, tauisvec = util.make_vec(taulist) kTlist = util.convert_temp(Tevec, teunit, 'keV') if apply_abund: ab = self.abund[Z]*self.abundsetvector[Z] else: ab = 1.0 eps = numpy.zeros([len(Tevec), len(tauvec)]) ret={} ret['wavelength'] = None for itau, tau in enumerate(tauvec): for ikT, kT in enumerate(kTlist): e, lam = self.spectra.return_line_emissivity(kT, tau, Z, z1, \ up, lo, \ specunit='A', \ teunit='keV', \ abundance=ab,\ init_pop=init_pop,\ freeze_ion_pop = freeze_ion_pop) eps[ikT, itau] = e if lam != False: ret['wavelength'] = lam * 1.0 else: ret['wavelength'] = None ret['Te'] = Telist ret['tau'] = taulist ret['teunit'] = teunit if ret['wavelength'] != None: ret['energy'] = const.HC_IN_KEV_A/ret['wavelength'] else: ret['energy'] = None if apply_aeff == True: e = ret['energy'] ibin = numpy.where(self.specbins<e)[0][-1] eps = eps*self.aeff[ibin] # now correct for vectors if not tauisvec: eps=eps[:,0] if not Teisvec: eps = eps[0] else: if not Teisvec: eps = eps[0,:] ret['epsilon'] = eps return ret
[docs] def return_spectrum(self, Te, tau, init_pop='ionizing', teunit='keV', nearest=False,\ log_interp=True, freeze_ion_pop=False): """ Get the spectrum at an exact temperature. Interpolates between 2 neighbouring spectra Finds HDU with kT closest to desired kT in given line or coco file. Opens the line or coco file, and looks for the header unit with temperature closest to te. Use result as index input to make_spectrum Parameters ---------- Te : float Temperature in keV or K tau : float ionization timescale, ne \* t (cm^-3 s). init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : {'keV' , 'K'} Units of te (kev or K, default keV) nearest : bool If set, return the spectrum from the nearest tabulated temperature in the file, without interpolation log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) Returns ------- spectrum : array(float) The spectrum in photons cm^5 s^-1 bin^-1, with the response, or photons cm^3 s^-1 bin^-1 if raw is set. nearest_T : float, optional If `nearest` is set, return the actual temperature this corresponds to. Units are same as `teunit` """ # Check that there is a response set if not self.response_set: raise util.ReadyError("Response not yet set: use set_response to set.") el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] self.spectra.ebins = self.specbins # self.spectra.dopseudo = self.dopseudo # self.spectra.dolines = self.dolines # self.spectra.docont = self.docont self.spectra.ebins_checksum=hashlib.md5(self.spectra.ebins).hexdigest() s= self.spectra.return_spectrum(Te, tau, init_pop=init_pop, \ teunit=teunit, \ nearest = nearest,elements = el_list, \ abundance=ab, log_interp=True,\ broaden_object=self.cdf, \ freeze_ion_pop = freeze_ion_pop,\ do_eebrems=self.do_eebrems, dolines=self.dolines,\ dopseudo=self.dopseudo, docont=self.docont) ss = self._apply_response(s) self.ionfrac = self.spectra.ionfrac return ss
class _NEISpectrum(_CIESpectrum): """ A class holding the emissivity data for NEI emission, and returning spectra Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) Attributes ---------- SessionType : string "NEI" spectra : dict of _ElementSpectrum a dictionary containing the emissivity data for each HDU, subdivided by ion (spectra[12][18][13] is an _ElementSpectrum object containing the argon XIII data for the 12th HDU) kTlist : array The temperatures for each emissivity HDU, in keV logkTlist : array log of kTlist """ def __init__(self, linedata, cocodata): """ Initializes the code. Populates the line and emissivity data in all temperature HDUs. Parameters ---------- linedata : HDUList The line emissivity data cocodata : HDUList The continuum emissivity data """ self.datacache={} self.SessionType = 'NEI' picklefname = os.path.expandvars('$ATOMDB/spectra_%s_%s.pkl'%\ (linedata[0].header['CHECKSUM'],\ cocodata[0].header['CHECKSUM'])) havepicklefile = False if os.path.isfile(picklefname): havepicklefile = True if havepicklefile: try: self.spectra = pickle.load(open(picklefname,'rb')) self.kTlist = self.spectra['kTlist'] except AttributeError: havepicklefile=False print("pre-stored data in %s is out of date. This can be caused by updates to the data "%(picklefname)+ "or, more likely, changes to pyatomdb. Regenerating...") # delete the old file if os.path.isfile(picklefname): os.remove(picklefname) if not havepicklefile: self.spectra={} self.kTlist = numpy.array(linedata[1].data['kT'].data) self.spectra['kTlist'] = numpy.array(linedata[1].data['kT'].data) for ihdu in range(len(self.kTlist)): self.spectra[ihdu]={} self.spectra[ihdu]['kT'] = self.kTlist[ihdu] ldat = numpy.array(linedata[ihdu+2].data.data) #cdat = numpy.array(cocodata[ihdu+2].data.data) # revision to do with variable length continuum arrays cdat = cocodata[ihdu+2].data Zarr = numpy.zeros([len(ldat), const.MAXZ_NEI+1], dtype=bool) Zarr[numpy.arange(len(ldat), dtype=int), ldat['Element']]=True for Z in range(1,const.MAXZ_NEI+1): if not Z in self.spectra[ihdu].keys(): self.spectra[ihdu][Z] = {} for z1 in range(1,Z+2): isz1 = (ldat['Ion_drv']==z1) isgood = isz1*Zarr[:,Z] icdat = numpy.where((cdat['Z']==Z) & (cdat['rmJ']==z1))[0] if len(icdat)==0: ccdat = [False] else: ccdat=[cdat[icdat[0]]] # this format is very tempremental and space inefficient. Make into a numpy array ccdat = numpy.zeros(1, dtype = apec.generate_datatypes('continuum', \ npseudo=cdat[icdat[0]]['N_Pseudo'], \ ncontinuum=cdat[icdat[0]]['N_Cont'])) ccdat['Z'] = cdat[icdat[0]]['Z'] ccdat['rmJ'] = cdat[icdat[0]]['rmJ'] ccdat['N_Cont'] = cdat[icdat[0]]['N_Cont'] ccdat['N_Pseudo'] = cdat[icdat[0]]['N_Pseudo'] ccdat['E_Pseudo'] = cdat[icdat[0]]['E_Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['Pseudo'] = cdat[icdat[0]]['Pseudo'][:ccdat['N_Pseudo'][0]] ccdat['E_Cont'] = cdat[icdat[0]]['E_Cont'][:ccdat['N_Cont'][0]] ccdat['Continuum'] = cdat[icdat[0]]['Continuum'][:ccdat['N_Cont'][0]] # for i in ccdat.dtype.names: # ccdat[i]=cdat[icdat[0]][i] self.spectra[ihdu][Z][z1]=_ElementSpectrum(ldat[isgood],\ ccdat[0], Z, z1_drv=z1) pickle.dump(self.spectra, open(picklefname,'wb')) self.logkTlist=numpy.log(self.kTlist) def _calc_ionfrac(self, Te, tau, init_pop='ionizing', teunit='keV', freeze_ion_pop = False,\ elements=False): """ Calculate the ion fractions in an NEI case Parameters ---------- Te : float Electron temperature (default, keV) tau : float ionization timescale, ne \* t (cm^-3 s). init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of kT (keV by default, K also allowed) freeze_ion_pop : bool If True, return the initial ionization fraction as the final elements : iterable of int Elements to include, listed by atomic number. if not set, include all. Returns ------- ionfrac : dict of arrays Array of all the ion fractions. """ init_pop_calc={} if elements is False: elements = self.elements # check the format of init_pop if isinstance(init_pop, str): if init_pop.lower() == 'ionizing': for Z in elements: init_pop_calc[Z] = numpy.zeros(Z+1) init_pop_calc[Z][0] = 1.0 elif init_pop.lower() == 'recombining': for Z in elements: init_pop_calc[Z] = numpy.zeros(Z+1) init_pop_calc[Z][-1] = 1.0 else: raise util.OptionError("Error: init_pop is set as a string, must be 'ionizing' or 'recombining'. Currently %s."%\ (init_pop)) elif isinstance(init_pop, float): # this is an initial temperature kT_init = util.convert_temp(init_pop, teunit, 'keV') for Z in elements: init_pop_calc[Z] = apec.return_ionbal(Z, kT_init, \ teunit='keV', \ datacache=self.datacache) elif isinstance(init_pop, dict): for Z in elements: init_pop_calc[Z] = init_pop[Z] else: raise util.OptionError("Error: invalid type for init_pop") ionfrac = {} if freeze_ion_pop: for Z in elements: ionfrac[Z] = init_pop_calc[Z] else: # no calculate the output kT = util.convert_temp(Te, teunit, 'keV') for Z in elements: ionfrac[Z] = apec.return_ionbal(Z, kT, init_pop=init_pop_calc[Z], \ tau=tau, \ teunit='keV', \ datacache=self.datacache) return ionfrac def return_spectrum(self, Te, tau, init_pop='ionizing', teunit='keV', nearest = False, elements=False, abundance=False, log_interp=True, broaden_object=False,\ freeze_ion_pop = False, dolines=True, docont=True, dopseudo=True, \ do_eebrems = True): """ Return the spectrum of the element on the energy bins in self.session.specbins Parameters ---------- Te : float Electron temperature (default, keV) tau : float ionization timescale, ne \* t (cm^-3 s). init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) do_eebrems : bool Calculate electron-electron bremsstrahlung emission (default True) Returns ------- spec : array(float) The element's emissivity spectrum, in photons cm^3 s^-1 bin^-1 """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: try: if elements is False: elements=range(1,const.MAXZ_NEI+1) except: pass if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 totspec = 0.0 # only do the elements where abundance > 0 el = [] for Z in elements: if abundance[Z]>0.0: el.append(Z) ionfrac_all = self._calc_ionfrac(Te, tau, init_pop=init_pop, teunit=teunit, \ freeze_ion_pop = freeze_ion_pop,\ elements = el) self.ionfrac = ionfrac_all spec = {} if len(ikT)==2: spec[0] = 0.0 spec[1] = 0.0 else: spec[0] = 0.0 for i in range(len(ikT)): # ikTspec = 0.0 for Z in elements: abund = abundance[Z] if abund > 0: ionfrac=ionfrac_all[Z] # elspec = 0.0 for z1 in range(1, Z+2): ionspec = 0.0 if ionfrac[z1-1]>1e-10: # calculate minimum emissivitiy to broaden, accounting for ion # and element abundance. epslimit = self.broaden_limit/(abund*ionfrac[z1-1]) # return a broadened spectrum ionspec = self.spectra[ikT[i]][Z][z1].return_spectrum(self.ebins,\ kT,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dopseudo=dopseudo,\ docont=docont,\ dolines=dolines) *\ ionfrac[z1-1] spec[i] += ionspec*abund if len(ikT)==2: totspec = self._merge_spectra_temperatures(f, spec[0], spec[1], log_interp) else: totspec = spec[0] if do_eebrems: nel = 0.0 rawabund = atomdb.get_abundance(datacache=self.datacache) for Z in ionfrac_all.keys(): if abundance[Z] > 0.0: nel += sum(ionfrac_all[Z]*numpy.arange(Z+1))*rawabund[Z]*abundance[Z] eespec = calc_ee_brems_spec(self.ebins, kT, nel) # Here we divide the eebrems by nel/nH so that the resulting # spectrum simply needs to # be multiplied by nenH try: nH =rawabund[1]*abundance[1] except KeyError: print("Warning: as this plasma has no hydrogen in it, assuming nH=1 for electron-electron bremstrahlung renorm") nH=1.0 totspec += eespec/(nel/nH) return totspec def return_line_emissivity(self, Te, tau, Z, z1, up, lo, specunit='A', teunit='keV', abundance=1.0, log_interp = True, init_pop = 'ionizing',\ freeze_ion_pop=False): """ Return the emissivity of a line at kT, tau. Assumes ionization from neutral for now Parameters ---------- Te : float Temperature in keV or K tau : float ionization timescale, ne \* t (cm^-3 s). Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) abundance : float Abundance to multiply the emissivity by log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. Returns ------- Emissivity : float Emissivity in photons cm^3 s^-1 spec : float Wavelength or Energy of line, depending on specunit """ # import collections kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, \ teunit='keV', \ nearest=False, \ log_interp=log_interp) #ikT has the 2 nearest temperature indexes # f has the fraction for each ionfrac_all = self._calc_ionfrac(Te, tau, init_pop=init_pop, teunit=teunit, \ freeze_ion_pop = freeze_ion_pop,\ elements = [Z]) self.ionfrac = ionfrac_all ionfrac = ionfrac_all[Z] eps = 0.0 lam = 0.0 # find lines which match for z1_drv in range(1,Z+2): # ions which don't exist get skipped if ionfrac[z1_drv-1] <= 1e-10: continue eps_in = numpy.zeros(len(ikT)) for i in range(len(ikT)): iikT =ikT[i] llist = self.spectra[iikT][Z][z1_drv].return_linematch(Z,z1,up,lo) for line in llist: # add emissivity eps_in[i] += line['Epsilon'] lam = line['Lambda'] if log_interp: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*numpy.log(eps_in[i]+const.MINEPSOFFSET) eps += numpy.exp(eps_out-const.MINEPSOFFSET)*abundance * ionfrac[z1_drv-1] else: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*eps_in[i] eps += eps_out*abundance * ionfrac[z1_drv-1] if specunit == 'keV': lam = const.HC_IN_KEV_A/lam return eps, lam def return_linelist(self, Te, tau,init_pop='ionizing', teunit='keV', nearest = False, specrange=False, specunit='A', elements=False, abundance=False,\ log_interp=True, freeze_ion_pop=False, by_ion_drv = False): """ Return the lines in a spectral range for a given Te, tau Parameters ---------- Te : float Electron temperature (default, keV) tau : float ionization timescale, ne \* t (cm^-3 s). init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Ansgtrom','keV'} Units for specrange (default A) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: if elements is False: elements=range(1,const.MAXZ_NEI+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 # totspec = 0.0 # only do the elements where abundance > 0 el = [] for Z in elements: if abundance[Z]>0.0: el.append(Z) ionfrac_all = self._calc_ionfrac(Te, tau, init_pop=init_pop, teunit=teunit, \ freeze_ion_pop = freeze_ion_pop,\ elements = el) self.ionfrac = ionfrac_all if by_ion_drv: linelist = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_nei_spectrum')) else: linelist = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) for Z in elements: abund = abundance[Z] #skip if none of element present if abund > 0: haveelemlist = False #elemllist = numpy.zeros(0,dtype=apec.generate_datatypes('linelist_nei_spectrum')) #cycle through each driving ion ionfrac = ionfrac_all[Z] for z1_drv in range(1, Z+2): # skip if none of driving ion present if ionfrac[z1_drv-1] <1e-10: continue # if > 1 temperature, calculate for each if len(ikT) > 1: # get the linelist llist1 = self.spectra[ikT[0]][Z][z1_drv].return_linelist(specrange,\ specunit=specunit) llist2 = self.spectra[ikT[1]][Z][z1_drv].return_linelist(specrange,\ specunit=specunit) # correct for ion fraction llist1['Epsilon'] *= ionfrac[z1_drv-1] llist2['Epsilon'] *= ionfrac[z1_drv-1] # create linelists for each temperature, or add to them if haveelemlist == False: elemllist1 = llist1 elemllist2 = llist2 haveelemlist = True else: elemllist1 = numpy.append(elemllist1, llist1) elemllist2 = numpy.append(elemllist2, llist2) else: # only 1 temperature # get the linelist llist = self.spectra[ikT[0]][Z][z1_drv].return_linelist(specrange,\ specunit=specunit) # correct for ion fraction llist['Epsilon'] *= ionfrac[z1_drv-1] # create linelists for each temperature, or add to them if haveelemlist == False: elemllist = llist haveelemlist = True else: elemllist = numpy.append(elemllist, llist) # now have a complete ionllist for each ion. Merge them and # remove duplicates if len(ikT) > 1: # merge the two temperatures elemllist = self._merge_linelists_temperatures(f, elemllist1, elemllist2, \ log_interp, \ by_ion_drv=by_ion_drv) else: # merge duplicate lines elemllist = self._merge_linelist_duplicates(elemllist, by_ion_drv=by_ion_drv) # fix abundance elemllist['Epsilon'] *= abund # append this element's line list onto the total line list if by_ion_drv: # appending all the colums, so easy linelist = numpy.append(linelist, elemllist) else: # Not keep the drv columns, so a little trickier linelisttmp = numpy.zeros(len(elemllist), dtype = linelist.dtype) for key in linelist.dtype.names: linelisttmp[key] = elemllist[key] linelist = numpy.append(linelist, linelisttmp) return linelist
[docs] class PShockSession(NEISession): """ Load and generate a Parallel Shock (PShock) spectrum Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Default AG89. Attributes ---------- datacache : dict Any Atomdb FITS files which have to be opened are stored here spectra : PShockSpectra Object storing the actual spectral data elements : list(int) Nuclear charge of elements to include. default_abundset : string The abundance set used for the original emissivity file calculation abundset : string The abundance set to be used for the returned spectrum abundsetvector : array_like(float) The relative abundance between default_abundset and abundset for each element response_set : bool Have we loaded a response (or set a dummy response) dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) broaden_limit : float Apply broadening to lines with epsilon > this value (ph cm3 s-1) thermal_broadening : bool Apply thermal broadening to lines (default = False) velocity_broadening : float Apply velocity broadening with this velocity (km/s). If <=0, do not apply. Examples -------- Create a session instance: >>> s=PShockSession() Set up the responses, in this case a dummy response from 0.1 to 10 keV >>> ebins = numpy.linspace(0.1,10,1000) >>> s.set_response(ebins, raw=True) Turn on thermal broadening >>> s.set_broadening(True) Will thermally broaden lines with emissivity > 1.000000e-20 ph cm3 s-1 Return spectrum at 1.0keV, Tau_u = 1e11, Tau_l = 0.0 (default) >>> spec = s.return_spectrum(1.0, 1e11) Same with Tau_l = 1e9 >>> spec = s.return_spectrum(1.0, 1e11, tau_l = 1e9) spec is in photons cm^3 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) """ def __init__(self, linefile="$ATOMDB/apec_nei_line.fits",\ cocofile="$ATOMDB/apec_nei_comp.fits",\ elements=False, abundset='AG89'): """ Initialization routine. Can set the line and continuum files here Input ----- linefile : str or HDUList The filename of the line emissivity data, or the opened file. cocofile : str or HDUList The filename of the continuum emissivity data, or the opened file. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. """ self.SessionType='PShock' self._session_initialise1(linefile, cocofile, elements, abundset) self.spectra=_PShockSpectrum(self.linedata, self.cocodata) self._session_initialise2() # self.datacache={} # # Open up the APEC files # self.set_apec_files(linefile, cocofile) # # if elements are specified, use them. Otherwise, use Z=1-30 # if util.keyword_check(elements): # self.elements = elements # else: # self.elements=list(range(1,const.MAXZ_NEI+1)) # # a hold for the spectra # self.spectra=PShockSpectrum(self.linedata, self.cocodata) # # Set both the current and the default abundances to those that # # the apec data was calculated on # self.abundset=self.linedata[0].header['SABUND_SOURCE'] # self.default_abundset=self.linedata[0].header['SABUND_SOURCE'] # self.abundsetvector = numpy.zeros(const.MAXZ_NEI+1) # for Z in self.elements: # self.abundsetvector[Z] = 1.0 # # but if another vector was already specified, use this instead # if util.keyword_check(abundset): # self.set_abundset(abundset) # self.abund = numpy.zeros(const.MAXZ_NEI+1) # for Z in self.elements: # self.abund[Z]=1.0 # # Set a range of parameters which can be overwritten later # self.response_set = False # have we loaded a response file? # self.dolines=True # Include lines in spectrum # self.docont=True # Include continuum in spectrum # self.dopseudo=True # Include pseudo continuum in spectrum # self.set_broadening(False, broaden_limit=1e-20) # self.cdf = _Gaussian_CDF()
[docs] def return_linelist(self,Te, tau_u, specrange, tau_l = 0.0, init_pop='ionizing',specunit='A', \ teunit='keV', apply_aeff=False, by_ion_drv = False,\ nearest=False, apply_binwidth=False): """ Get the list of line emissivities vs wavelengths Parameters ---------- Te : float Temperature in keV or K tau_u : float Upper limit of ionization timescale, ne \* t (cm^-3 s). specrange : [float, float] Minimum and maximum values for interval in which to search tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) specunit : {'Angstrom','keV'} Units for specrange teunit : {'keV' , 'K'} Units of te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the lines in the linelist to modify their intensities. by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. nearest : apply_binwidth : Returns ------- linelist : array(dtype) The list of lines with lambda (A), energy (keV), epsilon (ph cm3 s-1),\ epsilon_aeff (ph cm5 s-1) ion (string) and upper & lower levels. """ kT = util.convert_temp(Te, teunit, 'keV') el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] s= self.spectra.return_linelist(kT, tau_u, tau_l=tau_l, init_pop=init_pop, \ specrange=specrange, teunit='keV',\ specunit=specunit, elements=self.elements,\ abundance = ab, by_ion_drv = by_ion_drv) # do the response thing #resp = s.response() if apply_aeff == True: epsilon_aeff = self._apply_linelist_aeff(s, specunit, apply_binwidth) s['Epsilon_Err'] = epsilon_aeff return(s)
[docs] def return_spectrum(self, Te, tau_u, tau_l=0.0, init_pop='ionizing', teunit='keV', nearest=False,\ log_interp=True): """ Get the spectrum at an exact temperature. Interpolates between 2 neighbouring spectra Finds HDU with kT closest to desired kT in given line or coco file. Opens the line or coco file, and looks for the header unit with temperature closest to te. Use result as index input to make_spectrum Parameters ---------- Te : float Temperature in keV or K tau_u : float Upper limit of ionization timescale, ne \* t (cm^-3 s). tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : {'keV' , 'K'} Units of te (kev or K, default keV) nearest : bool If set, return the spectrum from the nearest tabulated temperature in the file, without interpolation log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- spectrum : array(float) The spectrum in photons cm^5 s^-1 bin^-1, with the response, or photons cm^3 s^-1 bin^-1 if raw is set. """ # Check that there is a response set if not self.response_set: raise util.ReadyError("Response not yet set: use set_response to set.") el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] self.spectra.ebins = self.specbins self.spectra.ebins_checksum=hashlib.md5(self.spectra.ebins).hexdigest() s= self.spectra.return_spectrum(Te, tau_u, tau_l=tau_l, init_pop=init_pop, \ teunit=teunit, \ nearest = nearest,elements = el_list, \ abundance=ab, log_interp=True,\ broaden_object=self.cdf,\ do_eebrems=self.do_eebrems) ss = self._apply_response(s) return ss
[docs] def return_line_emissivity(self, Telist, tau_ulist, Z, z1, up, lo, \ tau_llist = 0.0, specunit='A', teunit='keV', \ apply_aeff=False, apply_abund=True,\ log_interp = True, init_pop='ionizing'): """ Get line emissivity as function of Te, tau. Assumes ionization from neutral. Parameters ---------- Telist : float or array(float) Temperature(s) in keV or K tau_ulist : float ionization timescale(s), ne \* t (cm^-3 s). Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition tau_llist : float lower limit of ionization timescale(s), ne \* t (cm^-3 s). specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the line emissivity in the linelist to modify their intensities. apply_abund : bool If true, apply the abundance set in the session to the result. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- ret : dict Dictionary containing: Te, tau, teunit: as input wavelength : line wavelength (A) energy : line energy (keV) epsilon : emissivity in ph cm^3 s-1 (or ph cm^5 s^-1 if apply_aeff=True) first index is temperature, second is tau. """ Tevec, Teisvec = util.make_vec(Telist) tau_uvec, tau_uisvec = util.make_vec(tau_ulist) tau_lvec, tau_lisvec = util.make_vec(tau_llist) kTlist = util.convert_temp(Tevec, teunit, 'keV') if apply_abund: ab = self.abund[Z]*self.abundsetvector[Z] else: ab = 1.0 eps = numpy.zeros([len(Tevec), len(tau_uvec), len(tau_lvec)]) ret={} ret['wavelength'] = None for itau_u, tau_u in enumerate(tau_uvec): for ikT, kT in enumerate(kTlist): for itau_l, tau_l in enumerate(tau_lvec): e, lam = self.spectra.return_line_emissivity(kT, tau_u, Z, z1, \ up, lo, \ tau_l = tau_l,\ specunit='A', \ teunit='keV', \ abundance=ab,\ init_pop=init_pop) eps[ikT, itau_u, itau_l] = e if lam != False: ret['wavelength'] = lam * 1.0 # else: # ret['wavelength'] = None ret['Te'] = Telist ret['tau_u'] = tau_ulist ret['tau_l'] = tau_llist ret['teunit'] = teunit if ret['wavelength'] != None: ret['energy'] = const.HC_IN_KEV_A/ret['wavelength'] else: ret['energy'] = None if apply_aeff == True: e = ret['energy'] ibin = numpy.where(self.specbins<e)[0][-1] eps = eps*self.aeff[ibin] # now correct for vectors if not tau_lisvec: eps = eps.sum(2) if not tau_uisvec: eps = eps.sum(1) if not Teisvec: eps = eps.sum(0) ret['epsilon'] = eps return ret
class _PShockSpectrum(_NEISpectrum): """ A class holding the emissivity data for NEI emission, and returning spectra Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) Attributes ---------- session : CIESession The parent CIESession SessionType : string "CIE" spectra : dict of _ElementSpectrum a dictionary containing the emissivity data for each HDU, subdivided by element (spectra[12][18] is an _ElementSpectrum object containing the argon data for the 12th HDU) kTlist : array The temperatures for each emissivity HDU, in keV logkTlist : array log of kTlist """ def __init__(self, linedata, cocodata): """ Initializes the code. Populates the line and emissivity data in all temperature HDUs. Parameters ---------- linedata : HDUList The line emissivity data cocodata : HDUList The continuum emissivity data """ self.datacache={} self.SessionType = 'PShock' picklefname = os.path.expandvars('$ATOMDB/spectra_%s_%s.pkl'%\ (linedata[0].header['CHECKSUM'],\ cocodata[0].header['CHECKSUM'])) havepicklefile = False if os.path.isfile(picklefname): havepicklefile = True if havepicklefile: try: self.spectra = pickle.load(open(picklefname,'rb')) self.kTlist = self.spectra['kTlist'] except AttributeError: havepicklefile=False print("pre-stored data in %s is out of date. This can be caused by updates to the data "%(picklefname)+ "or, more likely, changes to pyatomdb. Regenerating...") # delete the old file if os.path.isfile(picklefname): os.remove(picklefname) if not havepicklefile: self.spectra={} self.kTlist = numpy.array(linedata[1].data['kT'].data) self.spectra['kTlist'] = numpy.array(linedata[1].data['kT'].data) for ihdu in range(len(self.kTlist)): self.spectra[ihdu]={} self.spectra[ihdu]['kT'] = self.kTlist[ihdu] ldat = numpy.array(linedata[ihdu+2].data.data) cdat = numpy.array(cocodata[ihdu+2].data.data) Zarr = numpy.zeros([len(ldat), const.MAXZ_NEI+1], dtype=bool) Zarr[numpy.arange(len(ldat), dtype=int), ldat['Element']]=True for Z in range(1,const.MAXZ_NEI+1): if not Z in self.spectra[ihdu].keys(): self.spectra[ihdu][Z] = {} for z1 in range(1,Z+2): isz1 = (ldat['Ion_drv']==z1) isgood = isz1*Zarr[:,Z] ccdat = cdat[(cdat['Z']==Z) & (cdat['rmJ']==z1)] if len(ccdat)==0: ccdat = [False] self.spectra[ihdu][Z][z1]=_ElementSpectrum(ldat[isgood],\ ccdat[0], Z, z1_drv=z1) pickle.dump(self.spectra, open(picklefname,'wb')) self.logkTlist=numpy.log(self.kTlist) def _calc_ionfrac(self, Te, tau_u, tau_l=0.0, init_pop='ionizing', teunit='keV', freeze_ion_pop = False,\ elements=False): """ Calculate the ion fractions in an NEI case Parameters ---------- Te : float Electron temperature (default, keV) tau_u : float Upper limit of ionization timescale, ne \* t (cm^-3 s). tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of kT (keV by default, K also allowed) freeze_ion_pop : bool If True, return the initial ionization fraction as the final elements : iterable of int Elements to include, listed by atomic number. if not set, include all. Returns ------- ionfrac : dict of arrays Array of all the ion fractions. """ init_pop_calc={} if elements is False: elements = self.elements # check the format of init_pop if isinstance(init_pop, str): if init_pop.lower() == 'ionizing': for Z in elements: init_pop_calc[Z] = numpy.zeros(Z+1) init_pop_calc[Z][0] = 1.0 elif init_pop.lower() == 'recombining': for Z in elements: init_pop_calc[Z] = numpy.zeros(Z+1) init_pop_calc[Z][-1] = 1.0 else: raise util.OptionError("Error: init_pop is set as a string, must be 'ionizing' or 'recombining'. Currently %s."%\ (init_pop)) elif isinstance(init_pop, float): # this is an initial temperature kT_init = util.convert_temp(init_pop, teunit, 'keV') for Z in elements: init_pop_calc[Z] = apec.return_ionbal(Z, kT_init, \ teunit='keV', \ datacache=self.datacache,\ filename=self.ionbalfile) elif isinstance(init_pop, dict): for Z in elements: init_pop_calc[Z] = init_pop[Z] else: raise util.OptionError("Error: invalid type for init_pop: ", init_pop) ionfrac = {} kT = util.convert_temp(Te, teunit, 'keV') if freeze_ion_pop: for Z in elements: ionfrac[Z] = init_pop_calc[Z] else: ##### if tau_l < 1.0: nzones = 200 taulist = numpy.logspace(numpy.log10(1e8), numpy.log10(tau_u),nzones+1) taulist = numpy.append(0, taulist[:-1]) weight = (taulist[1:]-taulist[:-1])/tau_u taulist = (taulist[1:]+taulist[:-1])/2 elif tau_l == tau_u: nzones=1 weight = numpy.array([1.0]) taulist = numpy.array([tau_u]) else: nzones=40 taulist = numpy.linspace(tau_l, tau_u, nzones+1)[:-1] weight = numpy.zeros(nzones) weight[:] = 0.025 taulist[1:] = 0.5*(taulist[1:]+taulist[:-1]) for Z in elements: # now need to make new ion fractions # cycle through the assorted tau values, to get new ionfrac # tau_l is zero: #now calculate cumulative ionfrac ionfractmp = apec.return_ionbal(Z,kT,init_pop = init_pop_calc[Z],\ tau = taulist,teunit='keV', \ datacache=self.datacache, filename=self.ionbalfile) for i in range(nzones): ionfractmp[i,:]*=weight[i] ionfrac[Z] = numpy.sum(ionfractmp,0) # elspec = 0.0 return ionfrac def return_spectrum(self, Te, tau_u, tau_l=0.0, init_pop='ionizing', teunit='keV', nearest = False, elements=False, abundance=False, log_interp=True, broaden_object=False,\ freeze_ion_pop=False, dolines=True, docont=True, dopseudo=True, \ do_eebrems = True): """ Return the spectrum of the element on the energy bins in self.session.specbins Parameters ---------- Te : float Electron temperature (default, keV) tau_u : float Upper limit of ionization timescale, ne \* t (cm^-3 s). tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. broaden_object : class Object with routine "broaden" which applies line broadening. Usually a Gaussian. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. dolines : bool Calculate line emission (default True) docont : bool Calculate Continuum emission (default True) dopseudo : bool Calculate PseudoContinuum (weak line) emission (default True) do_eebrems : bool Calculate electron-electron bremsstrahlung emission (default True) Returns ------- spec : array(float) The element's emissivity spectrum, in photons cm^3 s^-1 bin^-1 """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: if elements is False: elements=range(1,const.MAXZ_NEI+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 totspec = 0.0 ionfrac_all = self._calc_ionfrac(Te, tau_u, tau_l=tau_l, init_pop=init_pop, teunit=teunit, freeze_ion_pop=freeze_ion_pop, elements=elements) self.ionfrac=ionfrac_all for Z in elements: abund = abundance[Z] if abund > 0: # find the initial ionfrac (before anything changes) ionfrac = ionfrac_all[Z] elspec = 0.0 for i in range(len(ikT)): ikTspec = 0.0 for z1 in range(1, Z+2): ionspec = 0.0 if ionfrac[z1-1]>1e-10: # calculate minimum emissivitiy to broaden, accounting for ion # and element abundance. epslimit = self.broaden_limit/(abund*ionfrac[z1-1]) # return a broadened spectrum ionspec = self.spectra[ikT[i]][Z][z1].return_spectrum(self.ebins,\ kT,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dopseudo=dopseudo,\ docont=docont,\ dolines=dolines) *\ ionfrac[z1-1] ikTspec += ionspec if len(ikTspec) > 1: # add appropriately if log_interp: elspec += numpy.log(ikTspec+const.MINEPSOFFSET)*f[i] else: elspec += ikTspec*f[i] else: # if just one then no need to interpolate on log scale elspec += ikTspec*f[i] # now we have the spectrum of the whole element. Un-log scale if # required. if log_interp: elspec = numpy.exp(elspec)-const.MINEPSOFFSET # add the element's spectrum to the whole, including abundance totspec += elspec* abund if do_eebrems: nel = 0.0 rawabund = atomdb.get_abundance(datacache=self.datacache) for Z in ionfrac_all.keys(): if abundance[Z] > 0.0: nel += sum(ionfrac_all[Z]*numpy.arange(Z+1))*rawabund[Z]*abundance[Z] eespec = calc_ee_brems_spec(self.ebins, kT, nel) totspec += eespec return totspec def return_line_emissivity(self, Te, tau_u, Z, z1, up, lo, tau_l=0.0, specunit='A', teunit='keV', abundance=1.0, log_interp = True, init_pop = 'ionizing',\ freeze_ion_pop=False): """ Return the emissivity of a line at kT, tau. Assumes ionization from neutral for now Parameters ---------- Te : float Temperature in keV or K tau_u : float ionization timescale, ne \* t (cm^-3 s). Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) abundance : float Abundance to multiply the emissivity by log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. Returns ------- Emissivity : float Emissivity in photons cm^3 s^-1 spec : float Wavelength or Energy of line, depending on specunit """ # import collections kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, \ teunit='keV', \ nearest=False, \ log_interp=log_interp) #ikT has the 2 nearest temperature indexes # f has the fraction for each ionfrac_all = self._calc_ionfrac(Te, tau_u, tau_l=tau_l, init_pop=init_pop, teunit=teunit, \ freeze_ion_pop = freeze_ion_pop,\ elements = [Z]) ionfrac = ionfrac_all[Z] eps = 0.0 lam = 0.0 # find lines which match for z1_drv in range(1,Z+2): # ions which don't exist get skipped if ionfrac[z1_drv-1] <= 1e-10: continue eps_in = numpy.zeros(len(ikT)) for i in range(len(ikT)): iikT =ikT[i] llist = self.spectra[iikT][Z][z1_drv].return_linematch(Z,z1,up,lo) for line in llist: # add emissivity eps_in[i] += line['Epsilon'] lam = line['Lambda'] if log_interp: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*numpy.log(eps_in[i]+const.MINEPSOFFSET) eps += numpy.exp(eps_out-const.MINEPSOFFSET)*abundance * ionfrac[z1_drv-1] else: eps_out = 0.0 for i in range(len(ikT)): eps_out += f[i]*eps_in[i] eps += eps_out*abundance * ionfrac[z1_drv-1] if specunit == 'keV': lam = const.HC_IN_KEV_A/lam return eps, lam def return_linelist(self, Te, tau_u, tau_l=0.0, init_pop='ionizing', teunit='keV', nearest = False, specrange=False, specunit='A', elements=False, abundance=False,\ log_interp=True, freeze_ion_pop=False, by_ion_drv = False): """ Return the lines in a spectral range for a given Te, tau_u Parameters ---------- Te : float Electron temperature (default, keV) tau_u : float Upper limit of ionization timescale, ne \* t (cm^-3 s). tau_l : lower limit of ionization timescale, ne \* t (cm^-3 s) (defaults to 0) init_pop : init_pop : string or float if 'ionizing' : all ionizing from neutral (so [1,0,0,0...]) if 'recombining': all recombining from ionized (so[...0,0,1]) if dict of arrays : the acutal fractional populations (so init_pop[6] is the array for carbon) if single float : the temperature (same units as Te) teunit : string Units of Te (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Ansgtrom','keV'} Units for specrange (default A) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. freeze_ion_pop : bool If True, skip the ion population calculation, use init_pop as the final pop instead. by_ion_drv : bool If true, keep lines which are the same but have different ion_drv separate. Otherwise, merge them. """ # get kT in keV kT = util.convert_temp(Te, teunit, 'keV') ikT, f = self.get_nearest_Tindex(kT, teunit='keV', nearest=nearest, log_interp=log_interp) # check the params: if elements is False: elements=range(1,const.MAXZ_NEI+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 # totspec = 0.0 # only do the elements where abundance > 0 el = [] for Z in elements: if abundance[Z]>0.0: el.append(Z) ionfrac_all = self._calc_ionfrac(Te, tau_u, tau_l=tau_l, init_pop=init_pop, teunit=teunit, \ freeze_ion_pop = freeze_ion_pop,\ elements = el) if by_ion_drv: linelist = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_nei_spectrum')) else: linelist = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) for Z in elements: abund = abundance[Z] #skip if none of element present if abund > 0: haveelemlist = False #elemllist = numpy.zeros(0,dtype=apec.generate_datatypes('linelist_nei_spectrum')) #cycle through each driving ion ionfrac = ionfrac_all[Z] for z1_drv in range(1, Z+2): # skip if none of driving ion present if ionfrac[z1_drv-1] <1e-10: continue # if > 1 temperature, calculate for each if len(ikT) > 1: # get the linelist llist1 = self.spectra[ikT[0]][Z][z1_drv].return_linelist(specrange,\ specunit=specunit) llist2 = self.spectra[ikT[1]][Z][z1_drv].return_linelist(specrange,\ specunit=specunit) # correct for ion fraction llist1['Epsilon'] *= ionfrac[z1_drv-1] llist2['Epsilon'] *= ionfrac[z1_drv-1] # create linelists for each temperature, or add to them if haveelemlist == False: elemllist1 = llist1 elemllist2 = llist2 haveelemlist = True else: elemllist1 = numpy.append(elemllist1, llist1) elemllist2 = numpy.append(elemllist2, llist2) else: # only 1 temperature # get the linelist llist = self.spectra[ikT[0]][Z][z1_drv].return_linelist(specrange,\ teunit='keV', specunit=specunit) # correct for ion fraction llist['Epsilon'] *= ionfrac[z1_drv-1] # create linelists for each temperature, or add to them if haveelemlist == False: elemllist = llist haveelemlist = True else: elemllist = numpy.append(elemllist, llist) # now have a complete ionllist for each ion. Merge them and # remove duplicates if len(ikT) > 1: # merge the two temperatures elemllist = self._merge_linelists_temperatures(f, elemllist1, elemllist2, \ log_interp, \ by_ion_drv=by_ion_drv) else: # merge duplicate lines elemllist = self._merge_linelist_duplicates(elemllist, by_ion_drv=by_ion_drv) # fix abundance elemllist['Epsilon'] *= abund # append this element's line list onto the total line list if by_ion_drv: # appending all the colums, so easy linelist = numpy.append(linelist, elemllist) else: # Not keep the drv columns, so a little trickier linelisttmp = numpy.zeros(len(elemllist), dtype = linelist.dtype) for key in linelist.dtype.names: linelisttmp[key] = elemllist[key] linelist = numpy.append(linelist, linelisttmp) return linelist
[docs] class KappaSession(NEISession): """ Load and generate a Kappa spectrum Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) elements : arraylike(int), optional The atomic number of elements to include (default all) abundset : string The abundance set to use. Default AG89. Attributes ---------- datacache : dict Any Atomdb FITS files which have to be opened are stored here spectra : KappaSpectra Object storing the actual spectral data elements : list(int) Nuclear charge of elements to include. default_abundset : string The abundance set used for the original emissivity file calculation abundset : string The abundance set to be used for the returned spectrum abundsetvector : array_like(float) The relative abundance between default_abundset and abundset for each element response_set : bool Have we loaded a response (or set a dummy response) dolines : bool Calculate line emission docont : bool Calculate continuum emission dopseudo : bool Calculate pseudocontinuum emission broaden_limit : float Apply broadening to lines with epsilon > this value (ph cm3 s-1) thermal_broadening : bool Apply thermal broadening to lines (default = False) velocity_broadening : float Apply velocity broadening with this velocity (km/s). If <=0, do not apply. Examples -------- Create a session instance: >>> s=KappaSession() Set up the responses, in this case a dummy response from 0.1 to 10 keV >>> ebins = numpy.linspace(0.1,10,1000) >>> s.set_response(ebins, raw=True) Turn on thermal broadening >>> s.set_broadening(True) Will thermally broaden lines with emissivity > 1.000000e-18 ph cm3 s-1 Return spectrum at 1.0keV with kappa = 3.1 >>> spec = s.return_spectrum(1.0, 3.1) spec is in photons cm^3 s^-1 bin^-1; ebins are the bin edges (so spec is 1 element shorter than ebins) """ def __init__(self, linefile="$ATOMDB/apec_nei_line.fits",\ cocofile="$ATOMDB/apec_nei_comp.fits",\ kappadir = None,\ hsdatafile = "$ATOMDB/hahnsavin.fits",\ ionrecdatafile = "$ATOMDB/kappa_ir.fits",\ elements=[1,2,6,7,8,10,12,13,14,16,18,20,26,28],\ abundset='AG89'): """ Initialization routine. Can set the line and continuum files here Input ----- linefile : str or HDUList The filename of the line emissivity data, or the opened file. cocofile : str or HDUList The filename of the continuum emissivity data, or the opened file. elements : array_like(int) The atomic numbers of the elements to include. Defaults to all (1-30) abundset : string The abundance set to use. Defaults to AG89. See atomdb.set_abundance for list of options. """ self.SessionType='Kappa' self._session_initialise1(linefile, cocofile, elements, abundset) # # define the directories & other data files #if kappadir==None: ## set to the directory where this file sits #self.kappadir = os.path.dirname(os.path.realpath(__file__)) #os.environ['ATOMDBKAPPA'] = self.kappadir #else: # os.environ['ATOMDBKAPPA'] = kappadir self.hsdatafile = os.path.expandvars(hsdatafile) self.ionrecdatafile = os.path.expandvars(ionrecdatafile) # check if the files exist; download if they do not if not os.path.isfile(self.hsdatafile): try: ftpfile = '%s/releases/hahnsavin.fits'%(const.FTPPATH) wget.download(ftpfile+'.bz2', out="%s.bz2"%(self.hsdatafile)) b = open("%s.bz2"%(self.hsdatafile), 'rb') c = b.read() cc=bz2.decompress(c) o = open(self.hsdatafile,'wb') o.write(cc) b.close() o.close() util.record_upload('hahnsavin.fits') except: print("Could not find requested data file %s "%(self.hsdatafile)) if not os.path.isfile(self.ionrecdatafile): # print # try: # ftpfile = '%s/releases/%s'%(const.FTPPATH,self.ionrecdatafile.split('/')[-1]) # wget.download(ftpfile+'.bz2', out="%s.bz2"%(self.ionrecdatafile)) # b = open("%s.bz2"%(self.ionrecdatafile), 'rb') # c = b.read() # cc=bz2.decompress(c) # o = open(self.ionrecdatafile,'wb') # o.write(cc) # b.close() # o.close() # util.record_upload(fname) # # except: print("Could not find requested data file %s."%(self.ionrecdatafile)) print("This is normal is you have not run the model before.") makefile = util.question("Shall we generate it now?", "y", ["y","n"]) if makefile=='y': self.generate_ionrec_file(self.ionrecdatafile) else: return self.spectra=_KappaSpectrum(self.linedata, self.cocodata, \ self.hsdatafile, self.ionrecdatafile, elements = self.elements) self._session_initialise2()
[docs] def set_apec_files(self, linefile="$ATOMDB/apec_nei_line.fits",\ cocofile="$ATOMDB/apec_nei_comp.fits"): """ Set the apec line and coco files, and load up their data Parameters ---------- linefile : str or HDUList The filename of the line emissivity data, or the opened file. cocofile : str or HDUList The filename of the continuum emissivity data, or the opened file. Returns ------- None Notes ----- Updates self.linefile, self.linedata, self.cocofile and self.cocodata """ if util.keyword_check(linefile): if isinstance(linefile, str): lfile = os.path.expandvars(linefile) if not os.path.isfile(lfile): print("*** ERROR: no such file %s. Exiting ***" %(lfile)) return -1 self.linedata = pyfits.open(lfile) self.linefile = lfile elif isinstance(linefile, pyfits.hdu.hdulist.HDUList): # no need to do anything, file is already open self.linedata=linefile self.linefile=linefile.filename() else: print("Unknown data type for linefile. Please pass a string or an HDUList") if util.keyword_check(cocofile): if isinstance(cocofile, str): cfile = os.path.expandvars(cocofile) if not os.path.isfile(cfile): print("*** ERROR: no such file %s. Exiting ***" %(cfile)) return -1 self.cocodata=pyfits.open(cfile) self.cocofile=cfile elif isinstance(cocofile, pyfits.hdu.hdulist.HDUList): # no need to do anything, file is already open self.cocodata=cocofile self.cocofile=cocofile.filename() else: print("Unknown data type for cocofile. Please pass a string or an HDUList")
[docs] def return_linelist(self, Te, kappa, specrange, specunit='A', \ teunit='keV', apply_aeff=False, \ develop=False): """ Get the list of line emissivities vs wavelengths Parameters ---------- Te : float Temperature in keV or K kappa : float Non Maxwellian kappa parameter. Must be > 1.5. specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Angstrom','keV'} Units for specrange teunit : {'keV' , 'K'} Units of te (kev or K, default keV) apply_aeff : bool If true, apply the effective area to the lines in the linelist to modify their intensities. Returns ------- linelist : array(dtype) The list of lines with lambda (A), energy (keV), epsilon (ph cm3 s-1),\ epsilon_aeff (ph cm5 s-1) ion (string) and upper & lower levels. """ kT = util.convert_temp(Te, teunit, 'keV') el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] s= self.spectra.return_linelist(kT, kappa, \ specrange=specrange, teunit='keV',\ specunit=specunit, elements=self.elements,\ abundance = ab, log_interp=True) # do the response thing #resp = s.response() if apply_aeff == True: epsilon_aeff = self._apply_linelist_aeff(s, specunit, apply_binwidth) s['Epsilon_Err'] = epsilon_aeff return(s)
[docs] def return_line_emissivity(self, Telist, kappalist, Z, z1, up, lo, specunit='A', teunit='keV', apply_aeff=False, apply_abund=True,\ log_interp = True): """ Return the emissivity of a line at kT, tau. Assumes ionization from neutral for now Parameters ---------- Telist : float or array(float) Temperature(s) in keV or K kappa : float or array(float) Non Maxwellian kappa parameter. Must be > 1.5. Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Te (kev or K, default keV) abundance : float Abundance to multiply the emissivity by log_interp : bool Perform linear interpolation on a logT/logEpsilon grid (default), or linear. Returns ------- ret : dict Dictionary containing: Te, kappa, teunit: as input wavelength : line wavelength (A) energy : line energy (keV) epsilon : emissivity in ph cm^3 s-1 (or ph cm^5 s^-1 if apply_aeff=True) first index is temperature, second is tau. """ Tevec, Teisvec = util.make_vec(Telist) kappavec, kappaisvec = util.make_vec(kappalist) kTlist = util.convert_temp(Tevec, teunit, 'keV') eps = numpy.zeros([len(Tevec), len(kappavec)]) ret={} ret['wavelength'] = None if apply_abund: ab = self.abund[Z]*self.abundsetvector[Z] else: ab = 1.0 for ikappa, kappa in enumerate(kappavec): for ikT, kT in enumerate(kTlist): e, lam = self.spectra.return_line_emissivity(kT, kappa, Z, z1, \ up, lo, \ specunit='A', \ teunit='keV', \ abundance=ab) eps[ikT, ikappa] = e if lam != False: ret['wavelength'] = lam * 1.0 else: ret['wavelength'] = None ret['Te'] = Telist ret['kappa'] = kappalist ret['teunit'] = teunit if ret['wavelength'] != None: ret['energy'] = const.HC_IN_KEV_A/ret['wavelength'] else: ret['energy'] = None if apply_aeff == True: e = ret['energy'] ibin = numpy.where(self.specbins<e)[0][-1] eps = eps*self.aeff[ibin] # now correct for vectors if not kappaisvec: eps=eps[:,0] if not Teisvec: eps = eps[0] else: if not Teisvec: eps = eps[0,:] ret['epsilon'] = eps return ret
[docs] def return_spectrum(self, Te, kappa, teunit='keV',\ log_interp=True): """ Get the spectrum at an exact temperature. Interpolates between 2 neighbouring spectra Finds HDU with kT closest to desired kT in given line or coco file. Opens the line or coco file, and looks for the header unit with temperature closest to te. Use result as index input to make_spectrum Parameters ---------- Te : float Temperature in keV or K kappa : float Non Maxwellian kappa parameter. Must be > 1.5. teunit : {'keV' , 'K'} Units of te (kev or K, default keV) log_interp : bool Interpolate between temperature on a log-log scale (default). Otherwise linear Returns ------- spectrum : array(float) The spectrum in photons cm^5 s^-1 bin^-1, with the response, or photons cm^3 s^-1 bin^-1 if raw is set. """ # Check that there is a response set if not self.response_set: raise util.ReadyError("Response not yet set: use set_response to set.") el_list = self.elements ab = {} for Z in el_list: ab[Z] = self.abund[Z]*self.abundsetvector[Z] self.spectra.ebins = self.specbins self.spectra.ebins_checksum=hashlib.md5(self.spectra.ebins).hexdigest() self.spectra.dolines = self.dolines self.spectra.dopseudo = self.dopseudo self.spectra.docont = self.docont s= self.spectra.return_spectrum(Te, kappa, teunit=teunit, elements = el_list, \ abundances=ab, log_interp=True,\ broaden_object=self.cdf) ss = self._apply_response(s) return ss
[docs] def generate_ionrec_file(self, outfilename): """ Generates the input file for the HS ionrec data. This takes a little while to run but hopefully only has to be done occasionally. Parameters ---------- outfilename : string The filename for the output file Returns ------- None """ import datetime print("Please note that this takes a few minutes") dtype=numpy.dtype([('ELEMENT', '>i4'), ('ION_INIT', '>i4'), ('ION_FINAL', '>i4'), \ ('LEVEL_FINAL', '>i4'), ('LEVEL_INIT', '>i4'), ('TR_TYPE', 'U2'),\ ('PAR_TYPE', '>i4'), ('MIN_TEMP', '>f4'), ('MAX_TEMP', '>f4'), \ ('TEMPERATURE', '>f4',(20,)), ('IONREC_PAR', '>f4', (20,))]) outdat = numpy.zeros(20000, dtype=dtype) dtype2=numpy.dtype([('ELEMENT', '>i4'), ('ION', '>i4'), ('IONPOT', '>f4'), \ ('IP_DERE', '>f4')]) outdat2 = numpy.zeros(20000, dtype=dtype2) i = 0 ii =0 for Z in range(1,31): print(" Processing Z=",Z) for z1 in range(1, Z+1): irdat = atomdb.get_data(Z, z1, 'IR') if irdat==False: continue for trtype in ['RR','DR','CI','EA']: j = numpy.where((irdat[1].data['TR_TYPE']==trtype) & \ (irdat[1].data['LEVEL_INIT']==1) & \ (irdat[1].data['LEVEL_FINAL']==1))[0] for jj in j: for col in dtype.names: outdat[col][i] = irdat[1].data[col][jj] i+=1 outdat2['ELEMENT'][ii]=Z outdat2['ION'][ii]=z1 outdat2['IONPOT'][ii]=irdat[1].header['IONPOT'] try: outdat2['IP_DERE'][ii]=irdat[1].header['IP_DERE'] except: pass ii+=1 outdat=outdat[:i] outdat2=outdat2[:ii] # all the data is now assembled. Make an HDU. hdu0 = pyfits.PrimaryHDU() now = datetime.datetime.utcnow() hdu0.header['DATE']=( now.strftime('%d/%m/%y')) hdu0.header['IONREC']=( "Ionization and Recombination Rates") hdu0.header['FILENAME']=( "Generated using pyatomdb.spectrum.KappaSession.generate_ionrec_file") hdu0.header['ORIGIN']=( "ATOMDB v"+ util.get_current_database_version()+', '+os.environ['USER']+", AtomDB project") hdu0.header['HDUCLASS']=( "ATOMIC","Atomic Data") hdu0.header['HDUCLAS1']=( "IONREC","Ionization and Recombination Rate Data") hdu1 = pyfits.BinTableHDU.from_columns(pyfits.ColDefs( [pyfits.Column(name='ELEMENT', format='1J', array=outdat['ELEMENT']), pyfits.Column(name='ION_INIT', format='1J', array=outdat['ION_INIT']), pyfits.Column(name='ION_FINAL', format='1J', array=outdat['ION_FINAL']), pyfits.Column(name='LEVEL_FINAL', format='1J', array=outdat['LEVEL_FINAL']), pyfits.Column(name='LEVEL_INIT', format='1J', array=outdat['LEVEL_INIT']), pyfits.Column(name='TR_TYPE', format='2A', array=outdat['TR_TYPE']), pyfits.Column(name='PAR_TYPE', format='1J', array=outdat['PAR_TYPE']), pyfits.Column(name='MIN_TEMP', format='1E', unit='K', array=outdat['MIN_TEMP']), pyfits.Column(name='MAX_TEMP', format='1E', unit='K', array=outdat['MAX_TEMP']), pyfits.Column(name='TEMPERATURE', format='20E', array=outdat['TEMPERATURE']), pyfits.Column(name='IONREC_PAR', format='20E', array=outdat['IONREC_PAR'])] )) hdu2 = pyfits.BinTableHDU.from_columns(pyfits.ColDefs( [pyfits.Column(name='ELEMENT', format='1J', array=outdat2['ELEMENT']), pyfits.Column(name='ION', format='1J', array=outdat2['ION']), pyfits.Column(name='IONPOT', format='1E', unit='eV', array=outdat2['IONPOT']), pyfits.Column(name='IP_DERE', format='1E', unit='eV', array=outdat2['IP_DERE'])] )) # add some header keywords hdu1.header['EXTNAME'] = ('KAPPA_IR', 'Ionization and Recombination Rates') hdu1.header['HDUCLASS'] = ('ATOMIC', 'Atomic Data') hdu1.header['HDUCLAS1'] = ('IONREC', 'Ionization and Recombination Data') hdu1.header['HDUVERS1'] = ('1.0.0', 'Version of datafile') hdu2.header['EXTNAME'] = ('IONPOT', 'Ionization Potentials') hdu2.header['HDUCLASS'] = ('ATOMIC', 'Atomic Data') hdu2.header['HDUCLAS1'] = ('IONREC', 'Ionization Potentials of Ions') hdu2.header['HDUVERS1'] = ('1.0.0', 'Version of datafile') hdulist = pyfits.HDUList([hdu0,hdu1, hdu2]) outfilename = os.path.expandvars(outfilename) hdulist.writeto(outfilename, checksum=True, overwrite=True) print("Wrote New Ionization/Recombination data to %s"%(outfilename))
[docs] class hs_data(): """ Class to store the read in Hahn & Savin Data. Can then be queried to spit out the relevant rates, etc. """ def __init__(self,hsdatafile): """ Read in the data """ # for now, this is hardwired to load a pickle file. This is not # ideal, will be switched over to proper FITS files when happy # with format, etc. tmp = pyfits.open(hsdatafile) t = {} t['kmin']= tmp['K_MINMAX'].data['kmin'] t['kmax']= tmp['K_MINMAX'].data['kmax'] for i in range(12): t[i] = {} t[i]['a'] = tmp[i+2].data['a'] t[i]['c'] = tmp[i+2].data['c'] self.data=t
[docs] def get_coeffts(self, kappa, T): """ Get the Maxwellian coefficients for the kapps distribution PARAMETERS ---------- kappa : float kappa value for distribution. Must be > 1.5 T : float temperature in K RETURNS ------- Tlist : array(float) temperatures in K kappacoeff : array(float) the coefficients at each temperature in Tlist. """ i = numpy.where((self.data['kmin'] < kappa) &\ (self.data['kmax'] >= kappa))[0][0] Tlist = self.data[i]['a']*T kappacoeff = numpy.zeros(len(Tlist)) for ite in range(len(Tlist)): for ival in range(7): if numpy.isfinite(self.data[i]['c'][ite][ival]): kappacoeff[ite] += self.data[i]['c'][ite][ival]*kappa**(1.0*ival) return Tlist, kappacoeff
class _KappaSpectrum(_NEISpectrum): """ A class holding the emissivity data for NEI emission, and returning spectra Parameters ---------- linefile : string or HDUList, optional The line emissivity data file (either name or already open) cocofile : string or HDUList, optional The continuum emissivity data file (either name or already open) hsdatafile : string Name of FITS file with the H&S coefficient data. ionrecdatafile : string Name of FITS file with the abbreviated ionization and recombination coefficient data. elements : arraylike(int), optional The atomic number of elements to include (default all) Attributes ---------- session : CIESession The parent CIESession SessionType : string "CIE" spectra : dict of ElementSpectra a dictionary containing the emissivity data for each HDU, subdivided by element (spectra[12][18] is an ElementSpectrum object containing the argon data for the 12th HDU) kTlist : array The temperatures for each emissivity HDU, in keV logkTlist : array log of kTlist """ def __init__(self, linedata, cocodata, hsdatafile, ionrecdatafile, elements): """ Initializes the code. Populates the line and emissivity data in all temperature HDUs. Parameters ---------- linedata : The parent CIESession """ self.elements=elements self.datacache={} self.SessionType = 'NEI' picklefname = os.path.expandvars('$ATOMDB/spectra_%s_%s.pkl'%\ (linedata[0].header['CHECKSUM'],\ cocodata[0].header['CHECKSUM'])) havepicklefile = False if os.path.isfile(picklefname): havepicklefile = True if havepicklefile: try: self.spectra = pickle.load(open(picklefname,'rb')) self.kTlist = self.spectra['kTlist'] except AttributeError: havepicklefile=False print("pre-stored data in %s is out of date. This can be caused by updates to the data "%(picklefname)+ "or, more likely, changes to pyatomdb. Regenerating...") else: # delete the old file if os.path.isfile(picklefname): os.remove(picklefname) if not havepicklefile: self.spectra={} self.kTlist = numpy.array(linedata[1].data['kT'].data) self.spectra['kTlist'] = numpy.array(linedata[1].data['kT'].data) for ihdu in range(len(self.kTlist)): self.spectra[ihdu]={} self.spectra[ihdu]['kT'] = self.kTlist[ihdu] ldat = numpy.array(linedata[ihdu+2].data.data) cdat = numpy.array(cocodata[ihdu+2].data.data) Zarr = numpy.zeros([len(ldat), const.MAXZ_NEI+1], dtype=bool) Zarr[numpy.arange(len(ldat), dtype=int), ldat['Element']]=True for Z in range(1,const.MAXZ_NEI+1): if not Z in self.spectra[ihdu].keys(): self.spectra[ihdu][Z] = {} for z1 in range(1,Z+2): isz1 = (ldat['Ion_drv']==z1) isgood = isz1*Zarr[:,Z] ccdat = cdat[(cdat['Z']==Z) & (cdat['rmJ']==z1)] if len(ccdat)==0: ccdat = [False] self.spectra[ihdu][Z][z1]=_ElementSpectrum(ldat[isgood],\ ccdat[0], Z, z1_drv=z1) pickle.dump(self.spectra, open(picklefname,'wb')) self.logkTlist=numpy.log(self.kTlist) # now repeat for hahn savin data self.hsdata = hs_data(hsdatafile) # now repeat for ionization and recombination self.ionrecdata = ir_data(irfile = ionrecdatafile, elements=self.elements) def _calc_ionrec_rate(self, tkappa, ckappa, Z): """ Calculate the ionization and recombination rates for a kappa distribution, by summing maxwellians PARAMETERS ---------- tkappa : array(float) temperatures of each maxwellian component (K) ckappa : array(float) norm of each maxwellian component elements : array(int) atomic number of elements to calculate rates for RETURNS ------- ionrate : dict e.g. ionrate[16] is the ionization rate coefft for sulphur 1 through 17, in cm^3 s-1 recrate : dict e.g. recrate[16] is the recombiation rate coefft for sulphur 1 through 17, in cm^3 s-1 """ ircoeffts = self.ionrecdata.get_ir_rate(tkappa, Z) ionrate = {} recrate = {} ionrecrates = numpy.zeros((Z+1,Z+1), dtype=float) for z in range(1, Z+2): for z1 in range(1,Z+2): #ionrate = numpy.zeros(Z) #recrate = numpy.zeros(Z) ionrecrates[z-1, z1-1]=sum( ircoeffts[z-1,z1-1,:]*ckappa) #ionrecrates[z-1, z1-1]=sum( ircoeffts[z-1,z1-1,:]*ckappa) return ionrecrates # def return_oneT_spectrum(self, Te, Z, z1, epslimit, teunit='keV', log_interp=True,\ # broaden_object=False, ikT=False, f=False): # """ # return a single element, single ion, spectrum, interpolating # appropriately between neighboring temperature bins # """ # T = util.convert_temp(Te, teunit, 'K') # kT = util.convert_temp(Te, teunit, 'keV') # # Recalc fractions if required # if (type(ikT)==bool) | (type(f)==bool): # ikT, f = self.get_nearest_Tindex(kT, teunit='keV', log_interp=log_interp) # # ok, get the spectra # stot=0.0 # for i in range(len(ikT)): # # get the spectrum # sss = self.spectra[ikT[i]][Z][z1].return_spectrum(self.ebins,\ # kT,\ # ebins_checksum = self.ebins_checksum,\ # thermal_broadening = self.thermal_broadening,\ # broaden_limit = epslimit,\ # velocity_broadening = self.velocity_broadening,\ # broaden_object=broaden_object) # # add it appropriately # if log_interp: # stot += numpy.log(sss+const.MINEPSOFFSET)*f[i] # else: # stot +=sss*f[i] # # now handle the sum # stot = numpy.exp(stot)-const.MINEPSOFFSET*len(f) # stot[stot<0] = 0.0 # return stot def return_spectrum(self, Te, kappa, teunit='keV', elements=False, \ abundances=False, log_interp=True, broaden_object=False): """ Return the spectrum of the element on the energy bins in self.session.specbins Parameters ---------- Te : float Electron temperature (default, keV) kappa : float kappa coefficient (>1.5) teunit : string Units of kT (keV by default, K also allowed) FIXME nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. Returns ------- spec : array(float) The element's emissivity spectrum, in photons cm^3 s^-1 bin^-1 """ # get kT in keV T = util.convert_temp(Te, teunit, 'K') kT = util.convert_temp(Te, teunit, 'keV') # find the correct coefficients here tkappa_all, ckappa_all = self.hsdata.get_coeffts(kappa, T) #ionrate, recrate = self.calc_ionrec_rate(tkappa_all, ckappa_all, elements) self._calc_ionbal(tkappa_all, ckappa_all, elements) # filter out of range ones ckappa = ckappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] if len(ckappa) < len(tkappa_all): print("Note: only using %i of %i requested Maxwellian components as they are inside the 10^4 to 10^9K range"%(len(ckappa), len(tkappa_all))) tkappa = tkappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] # check the params: if elements==False: elements = self.elements # elements=range(1,const.MAXZ_NEI+1) if abundances == False: abundances = {} for Z in elements: abundances[Z] = 1.0 stot = 0.0 for ik, tk in enumerate(tkappa): ikT, f = self.get_nearest_Tindex(tk, teunit='K', log_interp=log_interp) s={} s[0] = 0.0 s[1] = 0.0 for Z in elements: abund = abundances[Z] if abund > 0: # solve the ionization balance # self.ionbal[Z] = apec.solve_ionbal(ionrate[Z], recrate[Z]) ionfrac = self.ionbal[Z] for z1 in range(1, Z+2): if ionfrac[z1-1]>1e-10: # calculate minimum emissivitiy to broaden, accounting for ion # and element abundance. epslimit = self.broaden_limit/(abund*ionfrac[z1-1]) for i, iikT in enumerate(ikT): s[i] += self.spectra[ikT[0]][Z][z1].return_spectrum(self.ebins,\ kT,\ ebins_checksum = self.ebins_checksum,\ thermal_broadening = self.thermal_broadening,\ broaden_limit = epslimit,\ velocity_broadening = self.velocity_broadening,\ broaden_object=broaden_object,\ dolines=self.dolines,\ dopseudo=self.dopseudo,\ docont=self.docont,\ ) *\ ionfrac[z1-1] * abund # merge the spectra smerge = self._merge_spectra_temperatures(f, s[0], s[1], log_interp) stot += smerge*ckappa[ik] return stot def _calc_ionbal(self, tkappa_all, ckappa_all, elements): self.ionbal={} for Z in elements: # solve the ionization balance ionrecrates = self._calc_ionrec_rate(tkappa_all, ckappa_all, Z) self.ionbal[Z] = apec.solve_multi_ionbal(ionrecrates) def return_line_emissivity(self, Te, kappa, Z, z1, up, lo, specunit='A', teunit='keV', abundance=1.0, log_interp = True): """ Return the emissivity of a line at kT, tau. Assumes ionization from neutral for now Parameters ---------- Te : float Temperature in keV or K kappa : float Non Maxwellian kappa parameter. Must be > 1.5. Z : int nuclear charge of element z1 : int ion charge +1 of ion up : int upper level for transition lo : int lower level for transition specunit : {'Angstrom','keV'} Units for wavelength or energy (a returned value) teunit : {'keV' , 'K'} Units of Telist (kev or K, default keV) abundance : float Abundance to multiply the emissivity by log_interp : bool Interpolate between temperature on a log-log scale (default). Otherwise linear Returns ------- Emissivity : float Emissivity in photons cm^3 s^-1 spec : float Wavelength or Energy of line, depending on specunit """ import collections kT = util.convert_temp(Te, teunit, 'keV') T = util.convert_temp(Te, teunit, 'K') # find the correct coefficients here tkappa_all, ckappa_all = self.hsdata.get_coeffts(kappa, T) self._calc_ionbal(tkappa_all, ckappa_all, [Z]) # filter out of range ones ckappa = ckappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] if len(ckappa) < len(tkappa_all): print("Note: only using %i of %i requested Maxwellian components as they are inside the 10^4 to 10^9K range"%(len(ckappa), len(tkappa_all))) tkappa = tkappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] eps = 0.0 lam = 0.0 ionfrac = self.ionbal[Z] for ik, tk in enumerate(tkappa): ikT, f = self.get_nearest_Tindex(tk, teunit='K', log_interp=log_interp) # find lines which match eps_tmp = 0.0 for z1_drv in range(1,Z+2): # ions which don't exist get skipped if ionfrac[z1_drv-1] <= 1e-10: continue eps_in = numpy.zeros(len(ikT)) for i, iikT in enumerate(ikT): llist = self.spectra[iikT][Z][z1_drv].return_linematch(Z,z1,up,lo) for line in llist: # add emissivity eps_in[i] += line['Epsilon']*ionfrac[z1_drv-1] lam = line['Lambda'] # now merge eps_tmp += eps_in eps_x = 0.0 if log_interp: for i in range(len(ikT)): eps_x += f[i]*numpy.log(eps_tmp[i]+const.MINEPSOFFSET) eps_tmp = (numpy.exp(eps_x)-const.MINEPSOFFSET)*abundance *ckappa[ik] else: for i in range(len(ikT)): eps_x += f[i]*eps_tmp[i] eps_tmp = eps_x*abundance *ckappa[ik] eps += eps_tmp if specunit == 'keV': lam = const.HC_IN_KEV_A/lam return eps, lam def return_linelist(self, Te, kappa,\ teunit='keV', nearest=False, specrange=False,\ specunit='A', elements=False, abundance=False,\ log_interp=True): """ Return the linelist of the element Parameters ---------- Te : float Electron temperature (default, keV) kappa : float Non Maxwellian kappa parameter. Must be > 1.5. teunit : string Units of kT (keV by default, K also allowed) nearest : bool If True, return spectrum for the nearest temperature index. If False, use the weighted average of the (log of) the 2 nearest indexes. default is False. specrange : [float, float] Minimum and maximum values for interval in which to search specunit : {'Ansgtrom','keV'} Units for specrange (default A) elements : iterable of int Elements to include, listed by atomic number. if not set, include all. abundance : dict(float) The abundances of each element, e.g. abund[6]=1.1 means multiply carbon abundance by 1.1. log_interp : bool Interpolate between temperature on a log-log scale (default). Otherwise linear """ # get kT in keV T = util.convert_temp(Te, teunit, 'K') kT = util.convert_temp(Te, teunit, 'keV') # find the correct coefficients here tkappa_all, ckappa_all = self.hsdata.get_coeffts(kappa, T) # filter out of range ones ckappa = ckappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] if len(ckappa) < len(tkappa_all): print("Note: only using %i of %i requested Maxwellian components as they are inside the 10^4 to 10^9K range"%(len(ckappa), len(tkappa_all))) tkappa = tkappa_all[(tkappa_all >= 1e4) & (tkappa_all <= 1e9)] # check the params: if elements==False: elements=range(1,const.MAXZ_NEI+1) if abundance == False: abundance = {} for Z in elements: abundance[Z] = 1.0 self._calc_ionbal(tkappa, ckappa, elements) linelist = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) # set up arrays to store by element lines s={} for Z in elements: s[Z] = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) for ik, tk in enumerate(tkappa): ikT, f = self.get_nearest_Tindex(tk, teunit='K', log_interp=log_interp) for Z in elements: abund = abundance[Z] stmp={} stmp['Z'] = {} stmp['Z'][0] = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) stmp['Z'][1] = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) if abund > 0: # solve the ionization balance # self.ionbal[Z] = apec.solve_ionbal(ionrate[Z], recrate[Z]) ionfrac = self.ionbal[Z] for z1_drv in range(1, Z+2): if ionfrac[z1_drv-1]>1e-10: # calculate minimum emissivitiy to broaden, accounting for ion # and element abundance. epslimit = self.broaden_limit/(abund*ionfrac[z1_drv-1]) llist={} for i, iikT in enumerate(ikT): llist[i] = self.spectra[iikT][Z][z1_drv].return_linelist(specrange, specunit=specunit) llist[i]['Epsilon']*= ionfrac[z1_drv-1] * abund tmpl = numpy.zeros(len(llist[i]), dtype=apec.generate_datatypes('linelist_cie_spectrum')) for key in tmpl.dtype.names: tmpl[key] = llist[i][key] stmp['Z'][i] = numpy.append(stmp['Z'][i], tmpl) #merge all the lines of the element at each temperature stmp['Z']['sum']=self._merge_linelists_temperatures(f, stmp['Z'][0], stmp['Z'][1], \ log_interp, \ by_ion_drv=False) # normalize for kappa C fraction stmp['Z']['sum']['Epsilon'] *= ckappa[ik] # append to the overall linelist for the element s[Z]=numpy.append(s[Z], stmp['Z']['sum']) # create a return linelist - all elements, removing duplicates s_out = numpy.zeros(0, dtype=apec.generate_datatypes('linelist_cie_spectrum')) for Z in elements: s_out=numpy.append(s_out, self._merge_linelist_duplicates(s[Z], by_ion_drv=False)) return s_out MIN_IONBAL = 1e-10
[docs] def make_vec(d): """ Create vector version of d, return True or false depending on whether input was vector or not Parameters ---------- d: any scalar or vector The input Returns ------- vecd : array of floats d as a vector (same as input if already an iterable type) isvec : bool True if d was a vector, otherwise False. """ isvec=True try: _ = (e for e in d) vecd=d except TypeError: isvec=False vecd= numpy.array([d]) return vecd,isvec
# some refence constants INTERP_I_UPSILON=900 #/*!< Include both left & right boundaries */ UPSILON_COLL_COEFF = 8.629e-6 # /*!< sqrt{2 pi / kB} hbar^2/m_e^{1.5} */ INTERP_IONREC_RATE_COEFF =300 MAX_CHI = 200.0 #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
[docs] class ir_data(): #------------------------------------------------------------------------------- def __init__(self, elements=False,\ irfile = 'kappa_ir.fits'): # load up the kappa coefficients self.elements=elements ionrec_data = {} ir = pyfits.open(irfile) for irdat in ir['KAPPA_IR'].data: Z = irdat['ELEMENT'] z1 = irdat['ION_INIT'] if not Z in ionrec_data.keys(): ionrec_data[Z] = {} if not z1 in ionrec_data[Z].keys(): ionrec_data[Z][z1] = {} ionrec_data[Z][z1]['EA']=False ionrec_data[Z][z1]['CI']=False ionrec_data[Z][z1]['RR']=False ionrec_data[Z][z1]['DR']=False # print(irdat) if ionrec_data[Z][z1][irdat['TR_TYPE']]==False: ionrec_data[Z][z1][irdat['TR_TYPE']]={} ionrec_data[Z][z1][irdat['TR_TYPE']][irdat['ION_FINAL']-1]=irdat for ionpot in ir['IONPOT'].data: ionrec_data[ionpot['ELEMENT']][ionpot['ION']]['IONPOT'] = ionpot['IONPOT'] ionrec_data[ionpot['ELEMENT']][ionpot['ION']]['IP_DERE'] = ionpot['IP_DERE'] # print(ionrec_data[12]) # zzz=input() self.ionrecdata=ionrec_data #-------------------------------------------------------------------------------
[docs] def get_ir_rate(self, T, Z): """ Return the (Maxwellian) ionization and recombination rates at temperature T (in K) PARAMETERS ---------- T : float or array(float) Temperature (Kelvin) """ rates={} Tvec , wasTvec = util.make_vec(T) ionrec = numpy.zeros([Z+1,Z+1, len(Tvec)], dtype=float) for z1 in range(1, Z+2): if self.ionrecdata[Z][z1]['CI'] != False: for dd in self.ionrecdata[Z][z1]['CI'].keys(): ionpot = self.ionrecdata[Z][z1]['IONPOT'] d=self.ionrecdata[Z][z1]['CI'][dd] if ((d['PAR_TYPE']>=const.CI_DERE) & (d['PAR_TYPE']<=const.CI_DERE+20)): try: ionpot = self.ionrecdata[Z][z1]['IP_DERE'] except: pass cirate = atomdb._calc_ionrec_ci(d, Tvec, extrap=True, ionpot=ionpot) ionrec[d['ion_final']-1, d['ion_init']-1,:]+=cirate else: pass if self.ionrecdata[Z][z1]['DR'] != False: for dd in self.ionrecdata[Z][z1]['DR'].keys(): d = self.ionrecdata[Z][z1]['DR'][dd] #print('d DR:',d) #print(self.ionrecdata[Z][z1]['DR']) if d != False: drrate = atomdb._calc_ionrec_dr(d, Tvec, extrap=True) ionrec[d['ion_final']-1, d['ion_init']-1,:]+=drrate else: pass if self.ionrecdata[Z][z1]['RR'] != False: for dd in self.ionrecdata[Z][z1]['RR'].keys(): d = self.ionrecdata[Z][z1]['RR'][dd] #print('d RR:',d) if d != False: rrrate = atomdb._calc_ionrec_rr(d, Tvec, extrap=True) ionrec[d['ion_final']-1, d['ion_init']-1,:]+=rrrate else: pass if self.ionrecdata[Z][z1]['EA'] != False: for dd in self.ionrecdata[Z][z1]['EA'].keys(): d = self.ionrecdata[Z][z1]['EA'][dd] #print('d EA:',d) if d != False: earate = atomdb._calc_ionrec_ea(d, Tvec, extrap=True) ionrec[d['ion_final']-1, d['ion_init']-1,:]+=earate else: pass return ionrec
# #------------------------------------------------------------------------------- # def calc_maxwell_rate(par_type,\ # min_temp,\ # max_temp,\ # temperatures,\ # ionrec_par,\ # ionpot, T, Z, degl, degu,\ # force_extrap = True): # """ # Interpolate ionization and recombination rates # PARAMETERS # ---------- # par_type : int # number denoting parameter type. Should be >900 for CI # min_temp : float # minimum temperature in tabulation (K) # max_temp : float # maximum temperature in tabulation (K) # temperatures: array(float) # temperatures ionrec_par is tabulated on (K) # ionrec_par: array(float) # ionization & recombination parameters # ionpot : float # ionization potential (keV) # T : float or array(float) # temperatures to caculate the rates on (K) # Z : int # nuclear charge # degl : int # degeneracy of initial ion ground state # degu : int # degeneracy of final ion ground state # force_extrap : bool # Whether to extrappolate outside of the listed table # RETURNS # ------- # ir : float or array(float) # The ionization or recombination rate coefficient, in cm^3 s^-1 # """ # Tvec, wasTvec = make_vec(T) # # number of useful data points # if (par_type > INTERP_IONREC_RATE_COEFF) &\ # (par_type <= INTERP_IONREC_RATE_COEFF+20): # N_intep = par_type-INTERP_IONREC_RATE_COEFF # # Check if input temperature was a vector. Convert to vector # # extract the data, and convert to doubles to aviod numerical issues # te_in = numpy.double(temperatures[:N_interp]) # ci_in = numpy.double(ionrec_par[:N_interp]) # # interpolate on log-log grid. Add 1e-30 to avoid interpolating log(0) # ir = numpy.exp(interpolate.interp1d(numpy.log(te_in), \ # numpy.log(ci_in+1e-30), \ # kind=1, bounds_error=False,\ # fill_value=numpy.nan)(numpy.log(Tvec)))-1e-30 # # let's deal with the near misses. This is necessary because sometimes # # a value is deemed out of bounds even though it is only </> the min/max # # due to floating point rounding issues. In these cases, set to the nearest # nantest = numpy.isnan(ir) # for i in range(len(nantest)): # if nantest[i]: # if ((Tvec[i] > 0.99*min_temp)&\ # (Tvec[i] <= min_temp)): # ir[i] = ionrec_par[0] # if ((Tvec[i] < 1.01*max_temp)&\ # (Tvec[i] >= max_temp)): # ir[i] = ci_in[-1] # # now look for further out of bounds issues, if extrappolation is desired # if force_extrap: # nantest = numpy.isnan(ir) # for i in range(len(nantest)): # # high T points are extrapolated as T**-(3/2) # if (Tvec[i] > max_temp): # ir[i]= si_in[-1] * (te_in[-1]/Tvec[i])**1.5 # # low T points are set to zero # ir[numpy.isnan(ir)] = 0.0 # if wasTvec== False: # ir=ir[0] # return ir # elif (par_type > INTERP_I_UPSILON) &\ # (par_type <= INTERP_I_UPSILON+20): # N_interp = par_type - INTERP_I_UPSILON # te_in = temperatures[:N_interp] # ci_in = ionrec_par[:N_interp] # chi = dE / (const.KBOLTZ*Tvec) # #it = numpy.where((Tvec>=min(te_in)) &\ # # (Tvec<=max(ti_in)))[0] # # fudge for dubious ionrec_par... # ci_in[ci_in < 0.0] = 0.0 # #if len(it) > 0: # upsilon = interpolate.interp1d(numpy.log(te_in), \ # numpy.log(ci_in+1e-30), \ # bounds_error=False, \ # fill_value=numpy.nan)(numpy.log(Tvec)) # upsilon = numpy.exp(upsilon)-1e-30 # calc_type = const.EI_UPSILON # upsilon[upsilon < 0] = 0.0 # # this is the same as the electron-impact version, but with extra factor of pi # ir = UPSILON_COLL_COEFF * upsilon * \ # numpy.exp(-chi) / (numpy.pi* numpy.sqrt(Tvec)*degl) # ir[chi < MAX_CHI] = 0.0 # # low T points are set to zero # ir[numpy.isnan(ir)] = 0.0 # if wasTvec== False: # ir=ir[0] # return ir # #------------------------------------------------------------------------------- # def calc_ci_rate(self, Z, z1, T): # """ # Calculate the collisional ionization rate # PARAMETERS # ---------- # Z : int # element nuclear charge # z1 : int # ion charge +1. This is for the lowest charge state in the ionization # or recombination process (so 3 for O III-> O IV and O IV -> O III) # T : float or array(floats) # Temperature (Kelvin) to calculate rates at # RETURNS # ------- # ci : float or array(float) # The collisional ionization rate coefficient in cm^3 s^-1 # """ # cidat = self.ionrecdata[Z][z1]['CI'] # if ((cidat['par_type']>const.INTERP_I_UPSILON) & \ # (cidat['par_type']<=const.INTERP_I_UPSILON+20)): # ionpot = self.ionrecdata[Z][z1]['IONPOT'] # # THIS IS A HACK. FIXME # # degl = 1 # degu = 1 # # END HACK # # ci = calc_maxwell_rate(cidat['PAR_TYPE'],\ # cidat['MIN_TEMP'],\ # cidat['MAX_TEMP'],\ # cidat['TEMPERATURE'],\ # cidat['IONREC_PAR'],\ # ionpot/1e3, T, Z, degl, degu) # elif ((cidat['par_type']>const.INTERP_I_UPSILON) & \ # (cidat['par_type']<=const.INTERP_I_UPSILON+20)): # ci = calc_maxwell_rate(cidat['PAR_TYPE'],\ # cidat['MIN_TEMP'],\ # cidat['MAX_TEMP'],\ # cidat['TEMPERATURE'],\ # cidat['IONREC_PAR'],\ # ionpot/1e3, T, Z, degl, degu) # elif ((cidat['par_type']>const.CI_DERE) &\ # (cidat['par_type']<=const.CI_DERE+20)): # ionpot = self.ionrecdata[Z][z1]['IP_DERE'] # Tvec, wasTvec = make_vec(T) # ci = atomdb._calc_ionrec_ci(cidat, Tvec, extrap=True, ionpot=ionpot) # else: # Tvec, wasTvec = make_vec(T) # ci = atomdb._calc_ionrec_ci(cidat,Tvec, extrap=True) # return ci # #------------------------------------------------------------------------------- # def calc_dr_rate(self, Z, z1, T): # """ # Calculate the dielectronic recombination rate # PARAMETERS # ---------- # Z : int # element nuclear charge # z1 : int # ion charge +1. This is for the lowest charge state in the ionization # or recombination process (so 3 for O III-> O IV and O IV -> O III) # T : float or array(floats) # Temperature (Kelvin) to calculate rates at # RETURNS # ------- # dr : float or array(float) # The dielectronic recombinatino rate coefficient in cm^3 s^-1 # """ # drdat = self.ionrecdata[Z][z1]['DR'] # dr = atomdb._calc_ionrec_dr(drdat, T, extrap=True) # return dr # #------------------------------------------------------------------------------- # def calc_rr_rate(self, Z, z1, T): # """ # Calculate the dielectronic recombination rate # PARAMETERS # ---------- # Z : int # element nuclear charge # z1 : int # ion charge +1. This is for the lowest charge state in the ionization # or recombination process (so 3 for O III-> O IV and O IV -> O III) # T : float or array(floats) # Temperature (Kelvin) to calculate rates at # RETURNS # ------- # rr : float or array(float) # The dielectronic recombinatino rate coefficient in cm^3 s^-1 # """ # rrdat = self.ionrecdata[Z][z1]['RR'] # rr = atomdb._calc_ionrec_rr(rrdat, T, extrap=True) # return rr # #------------------------------------------------------------------------------- # def calc_ea_rate(self, Z, z1, T): # """ # Calculate the dielectronic recombination rate # PARAMETERS # ---------- # Z : int # element nuclear charge # z1 : int # ion charge +1. This is for the lowest charge state in the ionization # or recombination process (so 3 for O III-> O IV and O IV -> O III) # T : float or aeaay(floats) # Temperature (Kelvin) to calculate rates at # RETURNS # ------- # ea : float or aeaay(float) # The dielectronic recombinatino rate coefficient in cm^3 s^-1 # """ # eadat = self.ionrecdata[Z][z1]['EA'] # ea = atomdb._calc_ionrec_ea(eadat, T, extrap=True) # return ea #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
[docs] def calc_ee_brems_spec(ebins, Te, dens, teunit='keV'): """ Calculate the electron-electron bremstrahlung per-bin emissivity Parameters ---------- ebins : array(float) The spectral bin edges in energy order (keV) Te : float Electron temperature (default, keV) dens : float The electron density (cm^-3) teunit : string Units of Te (keV by default, K also allowed) Returns ------- eebrems : array(float) electron-electron bremsstrahlung in ph cm^3 s^-1 bin^-1 """ # convert temperature to keV if required kT = util.convert_temp(Te, teunit, 'keV') # get the emission at each bin edge eespec = apec.calc_ee_brems(ebins, kT, dens) # average over bin width & centroid #eecent = (eespec[1:]+eespec[:-1])/2 #binwidth = ebins[1:]-ebins[:-1] ee = (ebins[1:]-ebins[:-1]) * (eespec[1:]+eespec[:-1])/2 return ee
#### LEGACY CODE BEYOND THIS POINT def __get_nei_line_emissivity(Z, z1, up, lo): """ DEPRECATED Return the line emissivity for a single line, separated out by the ion driving it PARAMETERS ---------- Z : int nuclear charge z1 : int ion charge +1 up : int the upper level lo : int the lower level RETURNS ------- emiss : dict a dictionary containing an array, one for each ion_drv, with the emissivity in it. E.g. emiss[6] is an nTe element array, with the emissivity due to z1=6 as a fn of temperature Also emiss['Te'] is the tempearture in keV """ warnings.warn("get_nei_line_emissivity is a deprecated function and will be removed. Use NEISession.return_line_emissivity instead", DeprecationWarning, stacklevel=3) ldata= pyfits.open(os.path.expandvars("$ATOMDB/apec_nei_line.fits")) emiss={} datacache = {} emiss['Te'] = ldata[1].data['kT'] for i in range(len(emiss['Te'])): j = ldata[i+2].data j2 = j[ (j['Element']==Z) &\ (j['Ion']==z1) &\ (j['UpperLev']==up) &\ (j['LowerLev']==lo)] if len(j2)>0: ionbal = apec.return_ionbal(Z, emiss['Te'][i], \ teunit='keV', \ datacache=datacache) for jj in j2: if not jj['ion_drv'] in emiss.keys(): emiss[jj['Ion_drv']] = numpy.zeros(len(emiss['Te'])) emiss[jj['Ion_drv']][i] = jj['Epsilon'] * ionbal[jj['ion_drv']-1] return emiss