"""This module contains classes handling different treatments of ozone.
The ozone models can either be used in RCE simulations, or it may be of
interest to run the :py:class:`Cariolle` or :py:class:`Simotrostra` models with
a fixed atmospheric temperature profile.
**In an RCE simulation**
Create an instance of an ozone class, *e.g.* :py:class:`OzoneHeight`,
and use it in an RCE simulation:
>>> import konrad
>>> ozone_fixed_with_height = konrad.ozone.OzoneHeight()
>>> rce = konrad.RCE(atmosphere=..., ozone=ozone_fixed_with_height)
>>> rce.run()
**Run an ozone model**
Create an ozone model, *e.g.* :py:class:`Cariolle`, and run the ozone model for
a fixed temperature profile:
>>> import konrad
>>> ozone_model = konrad.ozone.Cariolle(w=...)
>>> atmosphere = konrad.atmosphere.Atmosphere(plev=...)
>>> atmosphere['T'][0] = ... # set the temperature profile
>>> atmosphere['O3'][0] = ... # set the initial ozone profile
>>> for iteration in range(0, ...):
>>> ozone_model(atmosphere=atmosphere, timestep=...)
>>> final_ozone_profile = atmosphere['O3'][-1]
Note that to use :py:class:`Cariolle` and :py:class:`Simotrostra`, the
simotrostra package needs to be installed. It can be found here:
https://gitlab.com/simple-stratospheric-chemistry/simotrostra
"""
import os
import abc
import logging
import numpy as np
import numbers
from netCDF4 import Dataset
from scipy.interpolate import interp1d
from konrad import constants
from konrad.component import Component
__all__ = [
"Ozone",
"OzonePressure",
"OzoneHeight",
"OzoneNormedPressure",
"OzoneRedistributed",
"Cariolle",
"Simotrostra",
]
logger = logging.getLogger(__name__)
[docs]class Ozone(Component, metaclass=abc.ABCMeta):
"""Base class to define abstract methods for ozone treatments."""
[docs] def __init__(self):
"""
Parameters:
initial_ozone (ndarray): initial ozone vmr profile
"""
self["initial_ozone"] = (("plev",), None)
@abc.abstractmethod
def __call__(self, atmosphere, convection, timestep, zenith):
"""Updates the ozone profile within the atmosphere class.
Parameters:
atmosphere (konrad.atmosphere): atmosphere model containing ozone
concentration profile, height, temperature, pressure and half
pressure levels at the current timestep
convection (konrad.convection): convection model containing
information about the convective top
timestep (float): timestep of run [days]
zenith (float): solar zenith angle,
angle of the Sun to the vertical [degrees]
"""
[docs]class OzonePressure(Ozone):
"""Ozone fixed with pressure, no adjustment needed."""
def __call__(self, **kwargs):
return
[docs]class OzoneHeight(Ozone):
"""Ozone fixed with height."""
[docs] def __init__(self):
self._f = None
def __call__(self, atmosphere, **kwargs):
if self._f is None:
self._f = interp1d(
atmosphere["z"][0, :],
atmosphere["O3"],
fill_value="extrapolate",
)
atmosphere["O3"] = (("time", "plev"), self._f(atmosphere["z"][0, :]))
[docs]class OzoneNormedPressure(Ozone):
"""Ozone shifts with the normalisation level."""
[docs] def __init__(self, norm_level=None, coupling="convective_top"):
"""
Parameters:
norm_level (float): pressure for the normalisation
normally chosen as the convective top pressure at the start of
the simulation [Pa]
coupling (str): Mechanism to determine the normalisation level:
* "convective_top": Coupled to the convective top.
* "cold_point": Coupled to the cold point tropopause.
"""
self.norm_level = norm_level
self._f = None
self.coupling = coupling
[docs] def get_norm_level(self, atmosphere, convection):
if self.coupling == "convective_top":
# TODO: what if there is no convective top
return convection.get("convective_top_plev")[0]
elif self.coupling == "cold_point":
return atmosphere.get_cold_point_plev(interpolate=True)
else:
raise ValueError("Invalid coupling mechanism.")
def __call__(self, atmosphere, convection, **kwargs):
if self.norm_level is None:
self.norm_level = self.get_norm_level(atmosphere, convection)
if self._f is None:
self._f = interp1d(
atmosphere["plev"] / self.norm_level,
atmosphere["O3"][0, :],
fill_value="extrapolate",
)
norm_new = self.get_norm_level(atmosphere, convection)
atmosphere["O3"] = (
("time", "plev"),
self._f(atmosphere["plev"] / norm_new).clip(min=0).reshape(1, -1),
)
[docs]class OzoneRedistributed(Ozone):
"""Ozone redistribution following method by Hardiman et al. (2019)"""
[docs] def __init__(self):
self._f = None
[docs] def get_tropopause_index(self, atmosphere):
"""Get thermal and ozone tropopause indexes"""
z = atmosphere["z"][0, :] / 1000
T = atmosphere["T"][0, :]
lapse_rate_K_per_km = np.gradient(T, z)
mask1 = np.where(lapse_rate_K_per_km > -2)[0]
thermal_tropopause = False
i = 0
while not thermal_tropopause:
index = mask1[i]
limit_test = np.where(z - z[index] >= 2)[0][0]
region = lapse_rate_K_per_km[index : limit_test + 1]
if np.all(region >= -2):
thermal_tropopause = index
i += 1
ozone_tropopause = np.where(z - z[thermal_tropopause] > -1)[0][0]
ozone_tropopause_sub = np.where(z - z[ozone_tropopause] > -2)[0][0]
ozone_tropopause_super = np.where(z - z[thermal_tropopause] > 2)[0][0]
return [
thermal_tropopause,
ozone_tropopause_sub,
ozone_tropopause,
ozone_tropopause_super,
]
[docs] def redistribute_ozone(self, atmosphere):
"""Redistribute ozone according to Hardiman"""
from typhon.physics import density as rhopT
import copy
tropopause = self.get_tropopause_index(atmosphere)
ozone = copy.deepcopy(atmosphere["O3"][0, :])
ozone_val = 1.32e-7 * (28.9644 / 47.9982)
ozone_n = ozone[:]
if ozone[tropopause[2]] > ozone_val:
mask1 = ozone - ozone_val > 0
mask2 = np.where(ozone)[0] <= tropopause[2]
mask1 = np.logical_and(mask1, mask2)
ozone_n[mask1] = ozone_val
elif ozone[tropopause[2]] < ozone_val:
plevs = atmosphere["plev"][tropopause[1] : tropopause[2] + 1]
logO30 = np.log(ozone[tropopause[1]])
logO31 = np.log(ozone_val)
m = (logO31 - logO30) / (plevs[-1] - plevs[0])
O3 = np.exp((m * (plevs - plevs[0])) + logO30)
ozone_n[tropopause[1] : tropopause[2] + 1] = O3
plevs = atmosphere["plev"][tropopause[2] + 1 : tropopause[3] + 1]
logO30 = np.log(ozone_val)
logO31 = np.log(ozone[tropopause[3]])
m = (logO31 - logO30) / (plevs[-1] - plevs[0])
O3 = np.exp((m * (plevs - plevs[0])) + logO30)
ozone_n[tropopause[2] + 1 : tropopause[3] + 1] = O3
rho = rhopT(atmosphere["plev"], atmosphere["T"][0, :])
dp = np.hstack(
(
np.array([atmosphere["plev"][0] - atmosphere["phlev"][0]]),
np.diff(atmosphere["plev"]),
)
)
odiff = ozone_n[:] - ozone[:]
total_odiff = -rho * odiff * dp
total_odiff = np.sum(total_odiff[0 : tropopause[0] + 1])
total_ozone_strato = -rho * ozone[:] * dp
total_ozone_strato = np.sum(total_ozone_strato[tropopause[0] + 1 :])
factor_ozone_strato = 1 + (total_odiff / total_ozone_strato)
ozone_n[tropopause[0] :] *= factor_ozone_strato
return ozone_n
def __call__(self, atmosphere, **kwargs):
ozone_new = self.redistribute_ozone(atmosphere)
atmosphere["O3"] = (("time", "plev"), ozone_new.reshape(1, -1))
[docs]class Cariolle(Ozone):
"""Implementation of the Cariolle ozone scheme for the tropics."""
[docs] def __init__(self, w=0, is_coupled_upwelling=False):
"""
Parameters:
w (ndarray / int / float): upwelling velocity [mm / s]
is_coupled_upwelling (bool): whether to couple to upwelling model
"""
super().__init__()
if not is_coupled_upwelling:
self.w = w * 86.4 # in m / day
else:
self.w = None
self._is_coupled_upwelling = is_coupled_upwelling
[docs] def ozone_transport(self, o3, z, upwelling):
"""Rate of change of ozone is calculated based on the ozone gradient
and an upwelling velocity.
Parameters:
o3 (ndarray): ozone concentration [ppv]
z (ndarray): height [m]
upwelling (konrad.upwelling): upwelling model
Returns:
ndarray: change in ozone concentration [ppv / day]
"""
if self.w == 0: # no transport
return np.zeros(len(z))
if self._is_coupled_upwelling: # take value from upwelling class
w_array = upwelling._w
elif isinstance(self.w, np.ndarray): # use input array
w_array = self.w
elif isinstance(self.w, numbers.Number): # w is a single value
# transform to an array to apply transport everywhere
w = self.w
numlevels = len(z)
w_factor = np.ones(numlevels)
w_array = w * w_factor
do3dz = np.gradient(o3, z)
return -w_array * do3dz
[docs] def get_params(self, p):
cariolle_data = Dataset(
os.path.join(os.path.dirname(__file__), "data/Cariolle_data.nc")
)
p_data = cariolle_data["p"][:]
alist = []
for param_num in range(1, 8):
a = cariolle_data[f"A{param_num}"][:]
alist.append(interp1d(p_data, a, fill_value="extrapolate")(p))
return alist
[docs] def density_of_molecules(self, p, T):
"""
Parameters:
p (ndarray): pressure levels [Pa]
T (ndarray): temperature values [K]
Returns:
ndarray: density of molecules [number of molecules / m3]
"""
return (constants.avogadro * p) / (constants.molar_gas_constant_dry_air * T)
[docs] def overhead_molecules(self, gas, p, z, T):
"""
Parameters:
gas (ndarray): gas concentration [ppv] corresponding to levels p
p (ndarray): pressure levels [Pa]
z (ndarray): height values [m]
T (ndarray): temperature values [K]
Returns:
ndarray: number of molecules per m2 in overhead column
"""
# molecules / m2 of air in each layer
molecules_air = self.density_of_molecules(p, T) * np.gradient(z)
# overhead column in molecules / m2
col = np.flipud(np.cumsum(np.flipud(gas * molecules_air)))
return col
def __call__(self, atmosphere, timestep, upwelling, **kwargs):
T = atmosphere["T"][0, :]
p = atmosphere["plev"] # [Pa]
o3 = atmosphere["O3"][0, :] # moles of ozone / moles of air
z = atmosphere["z"][0, :] # m
o3col = self.overhead_molecules(o3, p, z, T) * 10 ** -4 # in molecules / cm2
A1, A2, A3, A4, A5, A6, A7 = self.get_params(p)
# A7 is in molecules / cm2
# tendency of ozone volume mixing ratio per second
do3dt = A1 + A2 * (o3 - A3) + A4 * (T - A5) + A6 * (o3col - A7)
# transport term
transport_ox = self.ozone_transport(o3, z, upwelling)
atmosphere["O3"] = (
("time", "plev"),
(o3 + (do3dt * 24 * 60 ** 2 + transport_ox) * timestep).reshape(1, -1),
)
[docs]class Simotrostra(Cariolle):
"""Wrapper for Ed Charlesworth's simple chemistry scheme."""
[docs] def __init__(self, w=0, is_coupled_upwelling=False):
"""
Parameters:
w (ndarray / int / float): upwelling velocity [mm / s]
is_coupled_upwelling (bool): whether to couple to upwelling model
"""
super().__init__(w=w, is_coupled_upwelling=is_coupled_upwelling)
try:
from simotrostra import Simotrostra
except ModuleNotFoundError:
raise ModuleNotFoundError(
"No module named 'simotrostra'. You need to install it in order"
" to use this ozone class.\n"
"It can be found here:\n"
"https://gitlab.com/simple-stratospheric-chemistry/simotrostra"
)
self._ozone = Simotrostra()
[docs] def simotrostra_profile(self, o3, atmosphere, timestep, zenith, upwelling):
"""
Parameters:
o3 (ndarray): ozone profile
atmosphere (konrad.atmosphere)
timestep (float): timestep of run [days]
zenith (float): solar zenith angle,
angle of the Sun to the vertical [degrees]
upwelling (konrad.upwelling): upwelling model
Returns:
ndarray: new ozone profile
list of ndarrays: source and sink terms
"""
z = atmosphere["z"][-1, :]
p, phlev = atmosphere["plev"], atmosphere["phlev"]
T = atmosphere["T"][-1, :]
source, sink_ox, sink_nox, sink_hox = self._ozone.tendencies(
z, p, phlev, T, o3, zenith
)
transport_ox = self.ozone_transport(o3, z, upwelling)
do3dt = source - sink_ox - sink_nox + transport_ox - sink_hox
o3_new = o3 + do3dt * timestep
# prevent concentrations getting too low - set tropospheric value
o3_new.clip(min=4e-8, out=o3_new)
return o3_new, [source, sink_ox, sink_nox, transport_ox, sink_hox]
[docs] def store_sink_terms(self, sink_terms):
"""
Parameters:
sink_terms (list of ndarrays): source and sink terms [ppv / day]
"""
source, sink_ox, sink_nox, transport_ox, sink_hox = sink_terms
for term, tendency in [
("ozone_source", source),
("ozone_sink_ox", sink_ox),
("ozone_sink_nox", sink_nox),
("ozone_transport", transport_ox),
("ozone_sink_hox", sink_hox),
]:
if term in self.data_vars:
self.set(term, tendency)
else:
self.create_variable(term, tendency)
def __call__(self, atmosphere, timestep, upwelling, zenith, **kwargs):
o3 = atmosphere["O3"][-1, :]
o3_new, sink_terms = self.simotrostra_profile(
o3, atmosphere, timestep, zenith, upwelling
)
atmosphere["O3"] = (("time", "plev"), o3_new.reshape(1, -1))
self.store_sink_terms(sink_terms)