Source code for pyxel.models.photon_collection.ariel_airs

# Copyright or © or Copr. Thibault Pichon, CEA Paris-Saclay (2023)
#
# thibault.pichon@cea.fr
#
# This file is part of the Pyxel general simulator framework.
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


"""Pyxel photon generator models for ARIEL AIRS."""

import logging
from pathlib import Path

import astropy.io.ascii
import numpy as np
import pandas as pd
from astropy import units as u
from astropy.io import fits
from astropy.units import Quantity
from scipy.integrate import cumulative_trapezoid

from pyxel.detectors import Detector
from pyxel.util import resolve_with_working_directory


def read_star_flux_from_file(
    filename: str,
) -> tuple[np.ndarray, np.ndarray]:
    """Read star flux file.

    # TODO: Read the unit from the text file.

    Parameters
    ----------
    filename: type string,
        Name of the target file. Different extension can be considered.

    Returns
    -------
    wavelength: type array 1D
        Wavelength associated with the flux of the star.
    flux: type array 1D
        Flux of the target considered, in  ph/s/m2/µm.
    """
    extension = Path(filename).suffix
    if extension == ".txt":
        wavelength, flux = np.loadtxt(resolve_with_working_directory(filename)).T
        # set appropriate unit here um
        wavelength = wavelength * u.micron
        # set appropriate units
        flux = flux * u.photon / u.s / u.m / u.m / u.micron

    elif extension == ".ecsv":
        data = astropy.io.ascii.read(filename)
        flux = data["flux"].data * data["flux"].unit
        wavelength = data["wavelength"].data * data["wavelength"].unit

    elif extension == ".dat":
        """ExoNoodle file"""
        df_topex = pd.read_csv(filename, header=11)
        wavelength, flux = (
            np.zeros(len(df_topex["wavelength flux "])),
            np.zeros(len(df_topex["wavelength flux "])),
        )
        for i in range(len(df_topex["wavelength flux "])):
            wavelength[i] = float(df_topex["wavelength flux "][i].split("        ")[0])
            conv = 1.51 * 1e3 * 1e4 / wavelength[i]
            flux[i] = (
                float(df_topex["wavelength flux "][i].split("        ")[1])
                * conv
                * 1e-6
            )  # Attention aux unités
        wavelength = wavelength * u.micron
        flux = flux * u.photon / u.s / u.m / u.m / u.micron
    else:
        logging.debug("ERROR while converting, extension not readable")

    return wavelength, flux


def convert_flux(
    wavelength: np.ndarray,
    flux: Quantity,
    telescope_diameter_m1: float,
    telescope_diameter_m2: float,
) -> np.ndarray:
    """Convert the flux of the target in ph/s/µm.

    Parameters
    ----------
    wavelength: 1D array
        Wavelength sampling of the considered target.
    flux: 1D array
        Flux of the target considered in ph/s/m2/µm.
    telescope_diameter_m1: float
        Diameter of the M1 mirror of the TA in m.
    telescope_diameter_m2: float
        Diameter of the M2 mirror of the TA in m.

    Returns
    -------
    conv_flux: 1D array
        Flux of the target considered in ph/s/µm.
    """

    logging.debug("Incident photon flux is being converted into ph/s/um.")
    # use of astropy code
    flux.to(
        u.photon / u.m**2 / u.micron / u.s,
        equivalencies=u.spectral_density(wavelength),
    )

    diameter_m2 = (
        telescope_diameter_m2 * u.meter
    )  # TODO: define a class to describe the optic of the telescope ?
    diameter_m1 = telescope_diameter_m1 * u.meter
    collecting_area = np.pi * diameter_m1 * diameter_m2 / 4
    conv_flux = np.copy(flux) * collecting_area

    return conv_flux


def compute_bandwidth(
    psf_wavelength: Quantity,
) -> tuple[Quantity, Quantity]:
    """Compute the bandwidth for non-even distributed values.

    First we put the poles, each pole is at the center of the previous wave and the next wave.
    We add the first pole and the last pole using symmetry. We get nw+1 poles

    Parameters
    ----------
    psf_wavelength : Quantity
       PSF object.

    Returns
    -------
    bandwidth : quantity array
        Bandwidth. Usually in microns.
    all_poles : quantity array
        Pole wavelengths. Dimension: nw
    """
    poles = (psf_wavelength[1:] + psf_wavelength[:-1]) / 2
    first_pole = psf_wavelength[0] - (psf_wavelength[1] - psf_wavelength[0]) / 2
    last_pole = psf_wavelength[-1] + (psf_wavelength[-1] - psf_wavelength[-2]) / 2
    all_poles = np.concatenate(([first_pole], poles, [last_pole]))
    bandwidth = all_poles[1:] - all_poles[:-1]

    return bandwidth, all_poles


# TODO: Add units
def integrate_flux(
    wavelength: np.ndarray,
    flux: np.ndarray,
    psf_wavelength: Quantity,
) -> np.ndarray:
    """Integrate flux on each bin around the psf.

    The trick is to integrate first, and interpolate after (and not vice-versa).

    Parameters
    ----------
    wavelength : quantity array
        Wavelength. Unit: usually micron. Dimension: small_n.
    flux : quantity array
        Flux. Unit: ph/s/m2/micron. Dimension: big_n.
    psf_wavelength : quantity array
        Point Spread Function per wavelength.

    Returns
    -------
    flux : quantity array
        Flux. UNit: photon/s. Dimension: nw.
    """

    logging.debug("Integrate flux on each bin around the psf...")

    # Set the parameters of the function
    _, all_poles = compute_bandwidth(psf_wavelength)

    # Cumulative count
    cum_sum = cumulative_trapezoid(y=flux, x=wavelength, initial=0.0)

    # self.wavelength has to quantity: value and units
    # interpolate over psf wavelength
    cum_sum_interp = np.interp(all_poles, wavelength, cum_sum)

    # Compute flux over psf spectral bin
    flux_int = cum_sum_interp[1:] - cum_sum_interp[:-1]

    # Update flux matrix
    flux_int = flux_int  # * flux.unit * wavelength.unit
    flux = np.copy(flux_int)

    return flux


# def multiply_by_transmission(psf, transmission_dict: Mapping[str, Any]) -> None:
#     """The goal of this function is to take into account the flux of the incident star"""
#     for t in transmission_dict.keys():
#         if "M" in t:
#             f = interpolate.interp1d(
#                 transmission_dict[t]["wavelength"],
#                 transmission_dict[t]["reflectivity_eol"],
#             )
#         else:
#             f = interpolate.interp1d(
#                 transmission_dict[t]["wavelength"],
#                 transmission_dict[t]["transmission_eol"],
#             )
#         flux = np.copy(flux) * f(psf.psf_wavelength)
#


def read_psf_from_fits_file(
    filename: str,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Read psf files depending on simulation and instrument parameters.

    Parameters
    ----------
    filename : str
        Name of the .fits file.

    Returns
    -------
    psf_datacube : ndarray
        3D array, PSF for each wavelength saved as array, one for each wavelength
        waves (1D array) wavelength. Unit: um.
    psf_wavelength : ndarray
        1D array, wavelengths. Unit: um.
    line_pos_psf : ndarray
        1D array, x position of the PSF at each wavelength.
    col_pos_psf : ndarray
        1D array, y position of the PSF at each wavelength.
    """
    # Open fits
    with fits.open(resolve_with_working_directory(filename)) as hdu:
        psf_datacube = hdu[0].data
        table = hdu[1].data

    # Position of the PSF on AIRS window along line
    line_psf_pos = (table["x_centers"]).astype(int)

    # Position of the PSF on AIRS window along col
    col_psf_pos = (table["y_centers"]).astype(int)

    # Wavelength
    psf_wavelength = table["waves"] * u.micron

    logging.debug(
        "PSF Datacube %r %r %r",
        psf_datacube.shape,
        psf_datacube.min(),
        psf_datacube.max(),
    )
    logging.debug(
        "PSF Wavelength %r %r %r",
        psf_wavelength.shape,
        psf_wavelength.min(),
        psf_wavelength.max(),
    )

    return psf_datacube, psf_wavelength, line_psf_pos, col_psf_pos


# TODO: Add units
def project_psfs(
    psf_datacube_3d: np.ndarray,
    line_psf_pos_1d: np.ndarray,
    col_psf_pos,
    flux: np.ndarray,
    row: int,
    col: int,
    expand_factor: int,
) -> tuple[np.ndarray, np.ndarray]:
    """Project each psf on a (n_line_final * self.zoom, n_col_final * self.zoom) pixel image.

    n_line_final, n_col_final = corresponds to window size. It varies with the channel.

    Parameters
    ----------
    psf_datacube_3d : numpy array
        Dimension (nw, n_line, n_col).
    line_psf_pos_1d : numpy array
        Line position of the center of the PSF along AIRS window.
    col_psf_pos : numpy array
        Column position of the center of PSF along AIRS window.
    flux : Quantity
        Unit electron/s dimension big_n.
    row :

    col :

    expand_factor :

    Returns
    -------
    result : Quantity, Quantity
        Dimension ny, nx, spectral image in e-/s. Shape = detector shape.

    """
    nw, n_line, n_col = psf_datacube_3d.shape  # Extract shape of the PSF
    half_size_col, half_size_line = n_col // 2, n_line // 2  # Half size of the PSF

    # photon_incident = np.zeros((detector.geometry.row * expend_factor, detector.geometry.col * expend_factor))
    # photoelectron_generated = np.zeros((detector.geometry.row * expend_factor, detector.geometry.col * expend_factor))
    photon_incident = np.zeros((row * expand_factor, col * expand_factor))
    photoelectron_generated = np.zeros((row * expand_factor, col * expand_factor))

    col_win_middle, line_win_middle = (
        int(col_psf_pos.mean()),
        int(line_psf_pos_1d.mean()),
    )

    for i in np.arange(nw):  # Loop over the wavelength
        # Resize along line dimension
        line1 = (
            line_psf_pos_1d[i]
            - line_win_middle
            + row * expand_factor // 2
            - half_size_line
        )
        line2 = (
            line_psf_pos_1d[i]
            - line_win_middle
            + row * expand_factor // 2
            + half_size_line
        )
        # Resize along col dimension
        col1 = (
            col_psf_pos[i] - col_win_middle + col * expand_factor // 2 - half_size_col
        )
        col2 = (
            col_psf_pos[i] - col_win_middle + col * expand_factor // 2 + half_size_col
        )
        # Derive the amount of photon incident on the detector
        photon_incident[line1:line2, col1:col2] = (
            photon_incident[line1:line2, col1:col2] + psf_datacube_3d[i, :, :] * flux[i]
        )

        # TODO: Here take into account the QE map of the detector, and its dependence with wavelength
        # QE of the detector has to be sampled with the same resolution as the PSF is sampled
        qe = 0.65
        # qe_detector[i]
        photoelectron_generated[line1:line2, col1:col2] = (
            photon_incident[line1:line2, col1:col2].copy() * qe
        )

    # This is the amount of photons incident on the detector
    photon_incident = photon_incident  # * flux.unit

    # This is the amount of photons incident on the detector
    photoelectron_generated = photoelectron_generated * u.electron

    return (rebin_2d(photon_incident, expand_factor)), (
        rebin_2d(photoelectron_generated, expand_factor)
    )


def rebin_2d(
    data: np.ndarray,
    expand_factor: int,
) -> np.ndarray:
    """Rebin as idl.

    Each pixel of the returned image is the sum of zy by zx pixels of the input image.

    Based on:       Rene Gastaud, 13 January 2016
    https://codedump.io/share/P3pB13TPwDI3/1/resize-with-averaging-or-rebin-a-numpy-2d-array
    2017-11-17 : RG and Alan O'Brien  compatibility with python 3 bug452

    Parameters
    ----------
    data : ndarray
        Data with 2 dimensions (image): ny, nx.
    expand_factor : tuple
        Expansion factor is a tuple of 2 integers: zy, zx.

    Returns
    -------
    result : ndarray
        Shrunk in dimension ny/zy, nx/zx.


    Example
    -------
    a = np.arange(48).reshape((6,8))
               rebin2d( a, [2,2])

    """

    # In case asymmetrical zoom is used
    zoom = [expand_factor, expand_factor]

    final_shape = (
        int(data.shape[0] // zoom[0]),
        zoom[0],
        int(data.shape[1] // zoom[1]),
        zoom[1],
    )
    logging.debug("final_shape %r", final_shape)
    result = data.reshape(final_shape).sum(3).sum(1)

    return result


# TODO: Add units
[docs] def wavelength_dependence_airs( detector: Detector, psf_filename: str, target_filename: str, telescope_diameter_m1: float, telescope_diameter_m2: float, expand_factor: int, time_scale: float = 1.0, ) -> None: """Generate the photon over the array according to a specific dispersion pattern (ARIEL-AIRS). Parameters ---------- detector : Detector Pyxel Detector object. psf_filename : string The location and the filename where the PSFs are located. target_filename : string The location and the filename of the target file used in the simulation. telescope_diameter_m1 : float Diameter of the M1 mirror of the TA in m. telescope_diameter_m2 : float Diameter of the M2 mirror of the TA in m. expand_factor : int Expansion factor used. time_scale : float Time scale in seconds. """ if not isinstance(expand_factor, int): raise TypeError("Expecting a 'int' type for 'expand_factor'.") if expand_factor <= 0: raise ValueError("Expecting a positive value for 'expand_factor'.") # Extract information from the PSF psf_datacube, psf_wavelength, line_psf_pos, col_psf_pos = read_psf_from_fits_file( filename=psf_filename ) # plt.figure() # plt.pcolormesh(psf_datacube[10, :, :]) # plt.title(psf_wavelength[10]) # Read flux from the fits file target_wavelength, target_flux = read_star_flux_from_file(filename=target_filename) # convert the flux by multiplying by the area of the detector # telescope_diameter_m1, telescope_diameter_m2 = ( # 1.1, # 0.7, # ) # in m, TODO to be replaced by Telescope Class ? target_conv_flux = convert_flux( wavelength=target_wavelength, flux=target_flux, telescope_diameter_m1=telescope_diameter_m1, telescope_diameter_m2=telescope_diameter_m2, ) # Integrate the flux over the PSF spectral bin integrated_flux = integrate_flux( wavelength=target_wavelength, flux=target_conv_flux, psf_wavelength=psf_wavelength, ) # The flux is now sample similarly to the PSF # The Flux can be multiplied here by the optical elements # multiply_by_transmission(psf, transmission_dict) # #TODO add class to take into account the transmission of the instrument # Project the PSF onto the Focal Plane, to get the detector image # row, col = 130, 64 # Could be replaced # Expend factor used: expand_factor = 18 _, photo_electron_generated = project_psfs( psf_datacube_3d=psf_datacube, line_psf_pos_1d=line_psf_pos, col_psf_pos=col_psf_pos, flux=integrated_flux, row=detector.geometry.row, col=detector.geometry.col, expand_factor=expand_factor, ) # Add the result to the photon array structure time_step = 1.0 # ? photon_array = photo_electron_generated * (time_step / time_scale) # assert photon_array.unit == "electron" detector.photon += np.array(photon_array)