Source code for thuner.analyze.mcs

"""
Functions for analyzing MCSs. In particular, for implementing the methodologies
presented in the following papers:

Short et al. (2023), Objectively diagnosing characteristics of mesoscale organization
from radar reflectivity and ambient winds. https://dx.doi.org/10.1175/MWR-D-22-0146.1
"""

import numpy as np
import pandas as pd
from pathlib import Path
from pydantic import Field
from thuner.attribute.utils import read_attribute_csv
import thuner.analyze.utils as utils
import thuner.write as write
import thuner.attribute.core as core
import thuner.log as log
from thuner.utils import BaseOptions
from thuner.option.attribute import Attribute, AttributeType

logger = log.setup_logger(__name__)

__all__ = [
    "process_velocities",
    "quality_control",
    "classify_all",
    "classify_angles",
    "classify_small_offsets",
    "classify_ambiguous",
    "AnalysisOptions",
]


[docs] def process_velocities( output_directory, window_size=6, analysis_directory=None, profile_dataset="era5_pl" ): """ Process winds and velocities for analysis. Parameters ---------- output_directory : str Path to the thuner run output directory. """ if analysis_directory is None: analysis_directory = output_directory / "analysis" options = utils.read_options(output_directory) mcs_options = options["track"].levels[1].object_by_name("mcs") member_objects = mcs_options.grouping.member_objects convective_label = member_objects[0] convective_options = options["track"].levels[0].object_by_name(convective_label) altitudes = convective_options.detection.altitudes filepath = output_directory / "attributes/mcs/core.csv" velocities = read_attribute_csv(filepath, columns=["u_flow", "v_flow"]) velocities = utils.temporal_smooth(velocities, window_size=window_size) velocities = velocities.rename(columns={"u_flow": "u", "v_flow": "v"}) if profile_dataset is not None: filepath = output_directory / f"attributes/mcs/{profile_dataset}_profile.csv" winds = read_attribute_csv(filepath, columns=["u", "v"]) winds = winds.xs(0, level="time_offset").sort_index() indexer = pd.IndexSlice[:, :, altitudes[0] : altitudes[1]] mean_winds = winds.loc[indexer].groupby(["time", "universal_id"]).mean() new_names = {"u": "u_ambient", "v": "v_ambient"} mean_winds = mean_winds.rename(columns=new_names) # Calculate a shear vector as the difference between the winds at the top and # bottom of layer used to detect convective echoes top = winds.xs(altitudes[1], level="altitude") bottom = winds.xs(altitudes[0], level="altitude") shear = top - bottom new_names = {"u": "u_shear", "v": "v_shear"} shear = shear.rename(columns=new_names) # Calculate system wind-relative velocities new_names_vel = {"u": "u_relative", "v": "v_relative"} renamed_velocities = velocities.rename(columns=new_names_vel) new_names_mean = {"u_ambient": "u_relative", "v_ambient": "v_relative"} renamed_mean_winds = mean_winds.rename(columns=new_names_mean) relative_velocities = renamed_velocities - renamed_mean_winds velocities_list = [velocities, mean_winds, shear, relative_velocities] else: velocities_list = [velocities] # Check if dataframes aligned if profile_dataset is not None and not velocities.index.equals(mean_winds.index): raise ValueError("Dataframes are not aligned. Perhaps enforce alignment first?") all_velocities = pd.concat(velocities_list, axis=1) # Create metadata for the attributes names = ["u", "v", "u_shear", "v_shear", "u_ambient", "v_ambient"] names += ["u_relative", "v_relative"] descriptions = [ "System ground relative zonal velocity.", "System ground relative meridional velocity.", f"Ambient zonal shear between {altitudes[0]} and {altitudes[1]} m.", f"Ambient meridional between {altitudes[0]} and {altitudes[1]} m.", f"Mean ambient zonal winds from {altitudes[0]} and {altitudes[1]} m.", f"Mean ambient meridional winds from {altitudes[0]} and {altitudes[1]} m.", "System wind relative zonal velocity.", "System wind relative meridional velocity.", ] if "u_shear" not in all_velocities.columns: names = names[:2] descriptions = descriptions[:2] data_type, precision, units, retrieval = float, 1, "m/s", None attributes = [] for name, description in zip(names, descriptions): kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type} kwargs.update({"precision": precision, "description": description}) kwargs.update({"units": units}) attributes.append(Attribute(**kwargs)) attributes.append(core.time()) attributes.append(core.record_universal_id()) attribute_type = AttributeType(name="velocities", attributes=attributes) filepath = analysis_directory / "velocities.csv" all_velocities = write.attribute.write_csv(filepath, all_velocities, attribute_type)
_summary = { "window_size": "Window size for temporal smoothing of velocities.", "min_area": "Minimum area of MCS in km^2.", "max_area": "Maximum area of MCS in km^2.", "max_boundary_overlap": "Maximum fraction of system member object pixels touching boundary.", "min_major_axis_length": "Minimum major axis length of MCS in km.", "min_axis_ratio": "Minimum axis ratio of MCS.", "min_duration": "Minimum duration of MCS in minutes.", "min_offset": "Minimum stratiform offset in km.", "min_shear": "Minimum shear in m/s.", "min_velocity": "Minimum velocity in m/s.", "min_relative_velocity": "Minimum relative velocity in m/s.", "quadrant_buffer_angle": "Buffer angle in degrees for quadrant based classification.", }
[docs] class AnalysisOptions(BaseOptions): """Options for convective system analysis.""" window_size: int = Field(6, description=_summary["window_size"], ge=1) min_area: float = Field(1e2, description=_summary["min_area"], ge=0) max_area: float = Field(np.inf, description=_summary["max_area"], gt=0) max_boundary_overlap: float = Field( 1e-3, description=_summary["max_boundary_overlap"], gt=0 ) min_major_axis_length: float = Field( 25, description=_summary["min_major_axis_length"], ge=0 ) min_axis_ratio: float = Field(2, description=_summary["min_axis_ratio"], ge=0) min_duration: float = Field(30, description=_summary["min_duration"], ge=0) min_offset: float = Field(10, description=_summary["min_offset"], ge=0) min_shear: float = Field(2, description=_summary["min_shear"], ge=0) min_velocity: float = Field(5, description=_summary["min_velocity"], ge=0) min_relative_velocity: float = Field( 2, description=_summary["min_relative_velocity"], ge=0 ) quadrant_buffer_angle: float = Field( 10, description=_summary["quadrant_buffer_angle"], ge=0 )
[docs] def quality_control( output_directory, analysis_options: AnalysisOptions, analysis_directory=None ): """ Perform quality control on MCSs based on the provided options. Parameters ---------- output_directory : str Path to the thuner run output directory. analysis_options : AnalysisOptions Options for analysis and quality control checks. Returns ------- pd.DataFrame DataFrame describing quality control checks. """ output_directory = Path(output_directory) if analysis_directory is None: analysis_directory = output_directory / "analysis" options = utils.read_options(output_directory) mcs_options = options["track"].levels[1].object_by_name("mcs") member_objects = mcs_options.grouping.member_objects convective_label = member_objects[0] anvil_label = member_objects[1] # Determine if the system is sufficiently contained within the domain filepath = output_directory / f"attributes/mcs/{convective_label}/quality.csv" convective = read_attribute_csv(filepath) filepath = output_directory / f"attributes/mcs/{anvil_label}/quality.csv" anvil = read_attribute_csv(filepath) filepath = output_directory / "attributes/mcs/core.csv" mcs = read_attribute_csv(filepath, columns=["parents"]) max_boundary_overlap = analysis_options.max_boundary_overlap convective = convective.rename(columns={"boundary_overlap": "convective_contained"}) anvil = anvil.rename(columns={"boundary_overlap": "anvil_contained"}) convective_check = convective["convective_contained"] <= max_boundary_overlap anvil_check = anvil["anvil_contained"] < max_boundary_overlap # Check if velocity/shear vectors are sufficiently large filepath = analysis_directory / "velocities.csv" velocities = read_attribute_csv(filepath) velocity_magnitude = velocities[["u", "v"]].pow(2).sum(axis=1).pow(0.5) velocity_check = velocity_magnitude >= analysis_options.min_velocity velocity_check.name = "velocity" if "u_shear" in velocities.columns: shear_magnitude = velocities[["u_shear", "v_shear"]].pow(2).sum(axis=1).pow(0.5) shear_check = shear_magnitude >= analysis_options.min_shear shear_check.name = "shear" relative_velocity = velocities[["u_relative", "v_relative"]] relative_velocity_magnitude = relative_velocity.pow(2).sum(axis=1).pow(0.5) min_relative_velocity = analysis_options.min_relative_velocity relative_velocity_check = relative_velocity_magnitude >= min_relative_velocity relative_velocity_check.name = "relative_velocity" # Check system area is of appropriate size, treating the system area as the maximum # area of the member objects all_areas = [] for obj in member_objects: filepath = output_directory / f"attributes/mcs/{obj}/core.csv" area = read_attribute_csv(filepath, columns=["area"]) area = area.rename(columns={"area": f"{obj}_area"}) all_areas.append(area) area = pd.concat(all_areas, axis=1).max(axis=1) min_area, max_area = analysis_options.min_area, analysis_options.max_area area_check = (area >= min_area) & (area <= max_area) area_check.name = "area" # Check the stratiform offset is sufficiently large filepath = output_directory / f"attributes/mcs/group.csv" offset = read_attribute_csv(filepath, columns=["x_offset", "y_offset"]) offset_magnitude = offset.pow(2).sum(axis=1).pow(0.5) offset_check = offset_magnitude >= analysis_options.min_offset offset_check.name = "offset" # Check the duration of the system is sufficiently long # First get the duration of each object from the velocity dataframe id_group = velocities.reset_index().groupby("universal_id")["time"] duration = id_group.agg(lambda x: x.max() - x.min()) duration_check = duration >= np.timedelta64(analysis_options.min_duration, "m") duration_check.name = "duration" dummy_df = velocities[[]].reset_index() merge_kwargs = {"on": "universal_id", "how": "left"} duration_check = dummy_df.merge(duration_check, **merge_kwargs) duration_check = duration_check.set_index(velocities.index.names) # Check if the object fails boundary overlap checks when first detected both_contained = pd.concat([convective_check, anvil_check], axis=1).all(axis=1) id_group = both_contained.reset_index().groupby("universal_id") initial_check = id_group.agg(lambda x: x.iloc[0]) initial_check = initial_check.drop(columns="time") new_name = {0: "initially_contained"} initial_check = initial_check.rename(columns=new_name) dummy_df = velocities[[]].reset_index() initial_check = dummy_df.merge(initial_check, **merge_kwargs) initial_check = initial_check.set_index(velocities.index.names) # Check whether the object has parents. When plotting we may only wish to filter out # short duration objects if they are not part of a larger system parents_check = mcs.reset_index().groupby("universal_id")["parents"] parents_check = parents_check.agg(lambda x: x.notna().any()) parents_check = dummy_df.merge(parents_check, on="universal_id", how="left") parents_check = parents_check.set_index(velocities.index.names) # Record whether the given object has children, using the parents column has_parents = mcs["parents"].dropna() children_check = pd.Series(False, index=velocities.index, name="children") children_check = children_check.reset_index() for i in range(len(has_parents)): parents = [int(p) for p in has_parents.iloc[i].split(" ")] for parent in parents: row_cond = children_check["universal_id"] == parent children_check.loc[row_cond, "children"] = True children_check = children_check.set_index(velocities.index.names) # Check the linearity of the system filepath = output_directory / f"attributes/mcs/{convective_label}/ellipse.csv" ellipse = read_attribute_csv(filepath, columns=["major", "minor"]) major_check = ellipse["major"] >= analysis_options.min_major_axis_length major_check.name = "major_axis" axis_ratio = ellipse["major"] / ellipse["minor"] axis_ratio_check = axis_ratio >= analysis_options.min_axis_ratio axis_ratio_check.name = "axis_ratio" names = ["convective_contained", "anvil_contained", "initially_contained"] names += ["velocity", "shear", "relative_velocity", "area", "offset"] names += ["major_axis", "axis_ratio", "duration", "parents", "children"] descriptions = [ "Is the system convective region sufficiently contained within the domain?", "Is the system anvil region sufficiently contained within the domain?", "Is the system contained within the domain when first detected?", "Is the system velocity sufficiently large?", "Is the system shear sufficiently large?", "Is the system relative velocity sufficiently large?", "Is the system area sufficiently large?", "Is the system stratiform offset sufficiently large?", "Is the system major axis length sufficiently large?", "Is the system axis ratio sufficiently large?", "Is the system duration sufficiently long?", "Does the system have parent systems?", "Does the system have children systems?", ] if "u_shear" not in velocities.columns: names.remove("shear") names.remove("relative_velocity") descriptions.remove("Is the system shear sufficiently large?") descriptions.remove("Is the system relative velocity sufficiently large?") data_type, precision, units, retrieval = bool, None, None, None attributes = [] for name, description in zip(names, descriptions): kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type} kwargs.update({"precision": precision, "description": description}) kwargs.update({"units": units}) attributes.append(Attribute(**kwargs)) attributes.append(core.time()) attributes.append(core.record_universal_id()) attribute_type = AttributeType(name="quality", attributes=attributes) filepath = analysis_directory / "quality.csv" quality = [convective_check, anvil_check, initial_check, velocity_check, area_check] quality += [offset_check, major_check, axis_ratio_check, duration_check] quality += [parents_check, children_check] if "u_shear" in velocities.columns: quality += [shear_check, relative_velocity_check] quality = pd.concat(quality, axis=1) quality = write.attribute.write_csv(filepath, quality, attribute_type)
ambiguity_quality_dispatcher = { "stratiform_offset": ["velocity"], "inflow": ["velocity", "relative_velocity"], "relative_stratiform_offset": ["relative_velocity"], "tilt": ["shear"], "propagation": ["shear", "relative_velocity"], }
[docs] def classify_all( output_directory, analysis_options: AnalysisOptions, analysis_directory=None, offset_filepath=None, velocities_filepath=None, quality_filepath=None, classify_small_offsets=False, classify_ambiguous=False, ): """ Classify MCSs based on quadrants, as described in Short et al. (2023). Parameters ---------- output_directory : str Path to the thuner run output directory. analysis_options : dict Dictionary of quality control options. Returns ------- pd.DataFrame DataFrame describing MCS classifications. """ if analysis_directory is None: analysis_directory = output_directory / "analysis" if velocities_filepath is None: velocities_filepath = analysis_directory / "velocities.csv" velocities = read_attribute_csv(velocities_filepath) if offset_filepath is None: offset_filepath = output_directory / "attributes/mcs/group.csv" offset = read_attribute_csv(offset_filepath, columns=["x_offset", "y_offset"]) if quality_filepath is None: quality_filepath = analysis_directory / "quality.csv" quality = read_attribute_csv(quality_filepath) u, v = velocities["u"], velocities["v"] u_shear, v_shear = velocities["u_shear"], velocities["v_shear"] u_relative, v_relative = velocities["u_relative"], velocities["v_relative"] x_offset, y_offset = offset["x_offset"], offset["y_offset"] names = ["stratiform_offset", "inflow", "relative_stratiform_offset"] names += ["tilt", "propagation"] descriptions = [ "Stratiform offset classification.", "Inflow classification.", "Relative stratiform offset classification.", "System 'tilt' relative to shear classification.", "System propagation relative to shear classification.", ] data_type, precision, units, retrieval = float, 1, "m/s", None attributes = [] for name, description in zip(names, descriptions): kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type} kwargs.update({"precision": precision, "description": description}) kwargs.update({"units": units}) attributes.append(Attribute(**kwargs)) attributes.append(core.time()) attributes.append(core.record_universal_id()) attribute_type = AttributeType(name="velocities", attributes=attributes) data_type, precision, units, retrieval = str, None, None, None attributes = [] for name, description in zip(names, descriptions): kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type} kwargs.update({"precision": precision, "description": description}) kwargs.update({"units": units}) attributes.append(Attribute(**kwargs)) attributes.append(core.time()) attributes.append(core.record_universal_id()) attribute_type = AttributeType(name="classification", attributes=attributes) labels = [["leading", "right", "trailing", "left"]] labels += [["front", "right", "rear", "left"]] labels += [["leading", "right", "trailing", "left"]] labels += [["down-shear", "shear-perpendicular", "up-shear", "shear-perpendicular"]] labels += [["down-shear", "shear-perpendicular", "up-shear", "shear-perpendicular"]] # Vector 2 defines the center of the first quadrant u2_list = [u, u, u_relative, u_shear, u_shear] v2_list = [v, v, v_relative, v_shear, v_shear] # The quadrant vector 1 falls in determines the classification u1_list = [x_offset, u_relative, x_offset, x_offset, u_relative] v1_list = [y_offset, v_relative, y_offset, y_offset, v_relative] # Offset classifications offset_names = ["stratiform_offset", "relative_stratiform_offset", "tilt"] all_classifications = [] for i in range(len(u2_list)): angles = utils.get_angle(u1_list[i], v1_list[i], u2_list[i], v2_list[i]) classified = classify_angles(names[i], angles, labels[i]) if classify_small_offsets and names[i] in offset_names: args = [classified, x_offset, y_offset, analysis_options.min_offset] classified = classify_small_offsets(*args) if classify_ambiguous: unambiguous = quality[ambiguity_quality_dispatcher[names[i]]].all(axis=1) classified = classify_ambiguous(classified, unambiguous) all_classifications.append(classified) classifications = pd.concat(all_classifications, axis=1) filepath = analysis_directory / "classification.csv" args = [filepath, classifications, attribute_type] classifications = write.attribute.write_csv(*args)
[docs] def classify_angles(name, angles, category_labels): """ Classify the quadrants based on the angles between the vectors. Parameters ---------- name : str Name of the classification angles : pd.DataFrame DataFrame of angles between vectors. category_labels : list List of category labels, [front_label, right_label, rear_label, left_label]. Returns ------- pd.Series Series of classifications. """ classification = pd.Series(pd.NA, index=angles.index, name=name, dtype=str) front_cond = (-np.pi / 4 < angles) & (angles <= np.pi / 4) classification[front_cond] = category_labels[0] right_cond = (np.pi / 4 < angles) & (angles <= 3 * np.pi / 4) classification[right_cond] = category_labels[1] rear_cond = (3 * np.pi / 4 < angles) | (angles <= -3 * np.pi / 4) classification[rear_cond] = category_labels[2] left_cond = (-3 * np.pi / 4 < angles) & (angles <= -np.pi / 4) classification[left_cond] = category_labels[3] return classification
[docs] def classify_small_offsets(classified, x_offset, y_offset, min_offset): """ Classify small offsets as 'centered' if the offset is less than the minimum offset. """ offset_magnitude = np.sqrt(x_offset**2 + y_offset**2) classified[offset_magnitude < min_offset] = "centered" return classified
[docs] def classify_ambiguous(classified, unambiguous): """ Classify ambiguous classifications as 'ambiguous'. """ classified[~unambiguous] = "ambiguous" return classified