"""This module contains classes handling different treatments of ozone.
The ozone models can either be used in RCE simulations, or it may be of
interest to run the :py:class:`Cariolle` or :py:class:`Simotrostra` models with
a fixed atmospheric temperature profile.

**In an RCE simulation**

Create an instance of an ozone class, *e.g.* :py:class:`OzoneHeight`,
and use it in an RCE simulation:

    >>> import konrad
    >>> ozone_fixed_with_height = konrad.ozone.OzoneHeight()
    >>> rce = konrad.RCE(atmosphere=..., ozone=ozone_fixed_with_height)

**Run an ozone model**

Create an ozone model, *e.g.* :py:class:`Cariolle`, and run the ozone model for
a fixed temperature profile:

    >>> import konrad
    >>> ozone_model = konrad.ozone.Cariolle(w=...)
    >>> atmosphere = konrad.atmosphere.Atmosphere(plev=...)
    >>> atmosphere['T'][0] = ...  # set the temperature profile
    >>> atmosphere['O3'][0] = ...  # set the initial ozone profile
    >>> for iteration in range(0, ...):
    >>>     ozone_model(atmosphere=atmosphere, timestep=...)
    >>> final_ozone_profile = atmosphere['O3'][-1]

Note that to use :py:class:`Cariolle` and :py:class:`Simotrostra`, the
simotrostra package needs to be installed. It can be found here:


import os
import abc
import logging
import numpy as np
import numbers
from netCDF4 import Dataset
from scipy.interpolate import interp1d

from konrad import constants
from konrad.component import Component

logger = logging.getLogger(__name__)

[docs]class Ozone(Component, metaclass=abc.ABCMeta): """Base class to define abstract methods for ozone treatments."""
[docs] def __init__(self): """ Parameters: initial_ozone (ndarray): initial ozone vmr profile """ self["initial_ozone"] = (("plev",), None)
@abc.abstractmethod def __call__(self, atmosphere, convection, timestep, zenith): """Updates the ozone profile within the atmosphere class. Parameters: atmosphere (konrad.atmosphere): atmosphere model containing ozone concentration profile, height, temperature, pressure and half pressure levels at the current timestep convection (konrad.convection): convection model containing information about the convective top timestep (float): timestep of run [days] zenith (float): solar zenith angle, angle of the Sun to the vertical [degrees] """
[docs]class OzonePressure(Ozone): """Ozone fixed with pressure, no adjustment needed.""" def __call__(self, **kwargs): return
[docs]class OzoneHeight(Ozone): """Ozone fixed with height."""
[docs] def __init__(self): self._f = None
def __call__(self, atmosphere, **kwargs): if self._f is None: self._f = interp1d( atmosphere["z"][0, :], atmosphere["O3"], fill_value="extrapolate", ) atmosphere["O3"] = (("time", "plev"), self._f(atmosphere["z"][0, :]))
[docs]class OzoneNormedPressure(Ozone): """Ozone shifts with the normalisation level."""
[docs] def __init__(self, norm_level=None, coupling="convective_top"): """ Parameters: norm_level (float): pressure for the normalisation normally chosen as the convective top pressure at the start of the simulation [Pa] coupling (str): Mechanism to determine the normalisation level: * "convective_top": Coupled to the convective top. * "cold_point": Coupled to the cold point tropopause. """ self.norm_level = norm_level self._f = None self.coupling = coupling
[docs] def get_norm_level(self, atmosphere, convection): if self.coupling == "convective_top": # TODO: what if there is no convective top return convection.get("convective_top_plev")[0] elif self.coupling == "cold_point": return atmosphere.get_cold_point_plev(interpolate=True) else: raise ValueError("Invalid coupling mechanism.")
def __call__(self, atmosphere, convection, **kwargs): if self.norm_level is None: self.norm_level = self.get_norm_level(atmosphere, convection) if self._f is None: self._f = interp1d( atmosphere["plev"] / self.norm_level, atmosphere["O3"][0, :], fill_value="extrapolate", ) norm_new = self.get_norm_level(atmosphere, convection) atmosphere["O3"] = ( ("time", "plev"), self._f(atmosphere["plev"] / norm_new).clip(min=0).reshape(1, -1), )
[docs]class OzoneRedistributed(Ozone): """Ozone redistribution following method by Hardiman et al. (2019)"""
[docs] def __init__(self): self._f = None
[docs] def get_tropopause_index(self, atmosphere): """Get thermal and ozone tropopause indexes""" z = atmosphere["z"][0, :] / 1000 T = atmosphere["T"][0, :] lapse_rate_K_per_km = np.gradient(T, z) mask1 = np.where(lapse_rate_K_per_km > -2)[0] thermal_tropopause = False i = 0 while not thermal_tropopause: index = mask1[i] limit_test = np.where(z - z[index] >= 2)[0][0] region = lapse_rate_K_per_km[index : limit_test + 1] if np.all(region >= -2): thermal_tropopause = index i += 1 ozone_tropopause = np.where(z - z[thermal_tropopause] > -1)[0][0] ozone_tropopause_sub = np.where(z - z[ozone_tropopause] > -2)[0][0] ozone_tropopause_super = np.where(z - z[thermal_tropopause] > 2)[0][0] return [ thermal_tropopause, ozone_tropopause_sub, ozone_tropopause, ozone_tropopause_super, ]
[docs] def redistribute_ozone(self, atmosphere): """Redistribute ozone according to Hardiman""" from typhon.physics import density as rhopT import copy tropopause = self.get_tropopause_index(atmosphere) ozone = copy.deepcopy(atmosphere["O3"][0, :]) ozone_val = 1.32e-7 * (28.9644 / 47.9982) ozone_n = ozone[:] if ozone[tropopause[2]] > ozone_val: mask1 = ozone - ozone_val > 0 mask2 = np.where(ozone)[0] <= tropopause[2] mask1 = np.logical_and(mask1, mask2) ozone_n[mask1] = ozone_val elif ozone[tropopause[2]] < ozone_val: plevs = atmosphere["plev"][tropopause[1] : tropopause[2] + 1] logO30 = np.log(ozone[tropopause[1]]) logO31 = np.log(ozone_val) m = (logO31 - logO30) / (plevs[-1] - plevs[0]) O3 = np.exp((m * (plevs - plevs[0])) + logO30) ozone_n[tropopause[1] : tropopause[2] + 1] = O3 plevs = atmosphere["plev"][tropopause[2] + 1 : tropopause[3] + 1] logO30 = np.log(ozone_val) logO31 = np.log(ozone[tropopause[3]]) m = (logO31 - logO30) / (plevs[-1] - plevs[0]) O3 = np.exp((m * (plevs - plevs[0])) + logO30) ozone_n[tropopause[2] + 1 : tropopause[3] + 1] = O3 rho = rhopT(atmosphere["plev"], atmosphere["T"][0, :]) dp = np.hstack( ( np.array([atmosphere["plev"][0] - atmosphere["phlev"][0]]), np.diff(atmosphere["plev"]), ) ) odiff = ozone_n[:] - ozone[:] total_odiff = -rho * odiff * dp total_odiff = np.sum(total_odiff[0 : tropopause[0] + 1]) total_ozone_strato = -rho * ozone[:] * dp total_ozone_strato = np.sum(total_ozone_strato[tropopause[0] + 1 :]) factor_ozone_strato = 1 + (total_odiff / total_ozone_strato) ozone_n[tropopause[0] :] *= factor_ozone_strato return ozone_n
def __call__(self, atmosphere, **kwargs): ozone_new = self.redistribute_ozone(atmosphere) atmosphere["O3"] = (("time", "plev"), ozone_new.reshape(1, -1))
[docs]class Cariolle(Ozone): """Implementation of the Cariolle ozone scheme for the tropics."""
[docs] def __init__(self, w=0, is_coupled_upwelling=False): """ Parameters: w (ndarray / int / float): upwelling velocity [mm / s] is_coupled_upwelling (bool): whether to couple to upwelling model """ super().__init__() if not is_coupled_upwelling: self.w = w * 86.4 # in m / day else: self.w = None self._is_coupled_upwelling = is_coupled_upwelling
[docs] def ozone_transport(self, o3, z, upwelling): """Rate of change of ozone is calculated based on the ozone gradient and an upwelling velocity. Parameters: o3 (ndarray): ozone concentration [ppv] z (ndarray): height [m] upwelling (konrad.upwelling): upwelling model Returns: ndarray: change in ozone concentration [ppv / day] """ if self.w == 0: # no transport return np.zeros(len(z)) if self._is_coupled_upwelling: # take value from upwelling class w_array = upwelling._w elif isinstance(self.w, np.ndarray): # use input array w_array = self.w elif isinstance(self.w, numbers.Number): # w is a single value # transform to an array to apply transport everywhere w = self.w numlevels = len(z) w_factor = np.ones(numlevels) w_array = w * w_factor do3dz = np.gradient(o3, z) return -w_array * do3dz
[docs] def get_params(self, p): cariolle_data = Dataset( os.path.join(os.path.dirname(__file__), "data/") ) p_data = cariolle_data["p"][:] alist = [] for param_num in range(1, 8): a = cariolle_data[f"A{param_num}"][:] alist.append(interp1d(p_data, a, fill_value="extrapolate")(p)) return alist
[docs] def density_of_molecules(self, p, T): """ Parameters: p (ndarray): pressure levels [Pa] T (ndarray): temperature values [K] Returns: ndarray: density of molecules [number of molecules / m3] """ return (constants.avogadro * p) / (constants.molar_gas_constant_dry_air * T)
[docs] def overhead_molecules(self, gas, p, z, T): """ Parameters: gas (ndarray): gas concentration [ppv] corresponding to levels p p (ndarray): pressure levels [Pa] z (ndarray): height values [m] T (ndarray): temperature values [K] Returns: ndarray: number of molecules per m2 in overhead column """ # molecules / m2 of air in each layer molecules_air = self.density_of_molecules(p, T) * np.gradient(z) # overhead column in molecules / m2 col = np.flipud(np.cumsum(np.flipud(gas * molecules_air))) return col
def __call__(self, atmosphere, timestep, upwelling, **kwargs): T = atmosphere["T"][0, :] p = atmosphere["plev"] # [Pa] o3 = atmosphere["O3"][0, :] # moles of ozone / moles of air z = atmosphere["z"][0, :] # m o3col = self.overhead_molecules(o3, p, z, T) * 10 ** -4 # in molecules / cm2 A1, A2, A3, A4, A5, A6, A7 = self.get_params(p) # A7 is in molecules / cm2 # tendency of ozone volume mixing ratio per second do3dt = A1 + A2 * (o3 - A3) + A4 * (T - A5) + A6 * (o3col - A7) # transport term transport_ox = self.ozone_transport(o3, z, upwelling) atmosphere["O3"] = ( ("time", "plev"), (o3 + (do3dt * 24 * 60 ** 2 + transport_ox) * timestep).reshape(1, -1), )
[docs]class Simotrostra(Cariolle): """Wrapper for Ed Charlesworth's simple chemistry scheme."""
[docs] def __init__(self, w=0, is_coupled_upwelling=False): """ Parameters: w (ndarray / int / float): upwelling velocity [mm / s] is_coupled_upwelling (bool): whether to couple to upwelling model """ super().__init__(w=w, is_coupled_upwelling=is_coupled_upwelling) try: from simotrostra import Simotrostra except ModuleNotFoundError: raise ModuleNotFoundError( "No module named 'simotrostra'. You need to install it in order" " to use this ozone class.\n" "It can be found here:\n" "" ) self._ozone = Simotrostra()
[docs] def simotrostra_profile(self, o3, atmosphere, timestep, zenith, upwelling): """ Parameters: o3 (ndarray): ozone profile atmosphere (konrad.atmosphere) timestep (float): timestep of run [days] zenith (float): solar zenith angle, angle of the Sun to the vertical [degrees] upwelling (konrad.upwelling): upwelling model Returns: ndarray: new ozone profile list of ndarrays: source and sink terms """ z = atmosphere["z"][-1, :] p, phlev = atmosphere["plev"], atmosphere["phlev"] T = atmosphere["T"][-1, :] source, sink_ox, sink_nox, sink_hox = self._ozone.tendencies( z, p, phlev, T, o3, zenith ) transport_ox = self.ozone_transport(o3, z, upwelling) do3dt = source - sink_ox - sink_nox + transport_ox - sink_hox o3_new = o3 + do3dt * timestep # prevent concentrations getting too low - set tropospheric value o3_new.clip(min=4e-8, out=o3_new) return o3_new, [source, sink_ox, sink_nox, transport_ox, sink_hox]
[docs] def store_sink_terms(self, sink_terms): """ Parameters: sink_terms (list of ndarrays): source and sink terms [ppv / day] """ source, sink_ox, sink_nox, transport_ox, sink_hox = sink_terms for term, tendency in [ ("ozone_source", source), ("ozone_sink_ox", sink_ox), ("ozone_sink_nox", sink_nox), ("ozone_transport", transport_ox), ("ozone_sink_hox", sink_hox), ]: if term in self.data_vars: self.set(term, tendency) else: self.create_variable(term, tendency)
def __call__(self, atmosphere, timestep, upwelling, zenith, **kwargs): o3 = atmosphere["O3"][-1, :] o3_new, sink_terms = self.simotrostra_profile( o3, atmosphere, timestep, zenith, upwelling ) atmosphere["O3"] = (("time", "plev"), o3_new.reshape(1, -1)) self.store_sink_terms(sink_terms)