Source code for thuner.analyze.utils

"""Utility functions for analyzing thuner output."""

from pathlib import Path
import yaml
import glob
import numpy as np
import thuner.option as option
import thuner.attribute.core as core
from thuner.attribute.utils import read_attribute_csv
from thuner.option.attribute import Attribute, AttributeType
import thuner.write as write
import pandas as pd


__all__ = ["read_options"]


def quality_control(
    object_name,
    output_directory,
    analysis_options,
    analysis_directory=None,
):
    """
    Perform quality control on MCSs based on the provided options.

    Parameters
    ----------
    output_directory : str
        Path to the thuner run output directory.
    analysis_options : AnalysisOptions
        Options for analysis and quality control checks.

    Returns
    -------
    pd.DataFrame
        DataFrame describing quality control checks.
    """

    output_directory = Path(output_directory)
    if analysis_directory is None:
        analysis_directory = output_directory / "analysis"

    # Determine if the system is sufficiently contained within the domain
    filepath = output_directory / f"attributes/{object_name}/quality.csv"
    quality = read_attribute_csv(filepath)

    max_boundary_overlap = analysis_options.max_boundary_overlap
    quality = quality.rename(columns={"boundary_overlap": "contained"})
    overlap_check = quality["contained"] <= max_boundary_overlap

    # Check if velocity/shear vectors are sufficiently large
    filepath = analysis_directory / "velocities.csv"
    velocities = read_attribute_csv(filepath)
    velocity_magnitude = velocities[["u", "v"]].pow(2).sum(axis=1).pow(0.5)
    velocity_check = velocity_magnitude >= analysis_options.min_velocity
    velocity_check.name = "velocity"

    # Check system area is of appropriate size, treating the system area as the maximum
    # area of the member objects
    filepath = output_directory / f"attributes/{object_name}/core.csv"
    parents = read_attribute_csv(filepath, columns=["parents"])
    area = read_attribute_csv(filepath, columns=["area"])
    area = area.rename(columns={"area": f"{object_name}_area"})

    min_area, max_area = analysis_options.min_area, analysis_options.max_area
    area_check = (area >= min_area) & (area <= max_area)
    area_check.name = "area"

    # Check the duration of the system is sufficiently long
    # First get the duration of each object from the velocity dataframe
    id_group = velocities.reset_index().groupby("universal_id")["time"]
    duration = id_group.agg(lambda x: x.max() - x.min())
    duration_check = duration >= np.timedelta64(analysis_options.min_duration, "m")
    duration_check.name = "duration"
    dummy_df = velocities[[]].reset_index()
    merge_kwargs = {"on": "universal_id", "how": "left"}
    duration_check = dummy_df.merge(duration_check, **merge_kwargs)
    duration_check = duration_check.set_index(velocities.index.names)

    # Check if the object fails boundary overlap checks when first detected
    id_group = overlap_check.reset_index().groupby("universal_id")
    initially_contained = id_group.agg(lambda x: x.iloc[0])
    initially_contained = initially_contained.drop(columns="time")
    new_name = {"contained": "initially_contained"}
    initially_contained = initially_contained.rename(columns=new_name)
    dummy_df = velocities[[]].reset_index()
    initially_contained = dummy_df.merge(initially_contained, **merge_kwargs)
    initially_contained = initially_contained.set_index(velocities.index.names)

    # Check whether the object has parents. When plotting we may only wish to filter out
    # short duration objects if they are not part of a larger system
    parents_check = parents.reset_index().groupby("universal_id")["parents"]
    parents_check = parents_check.agg(lambda x: x.notna().any())
    parents_check = dummy_df.merge(parents_check, on="universal_id", how="left")
    parents_check = parents_check.set_index(velocities.index.names)

    # Record whether the given object has children, using the parents column
    has_parents = parents["parents"].dropna()
    children_check = pd.Series(False, index=velocities.index, name="children")
    children_check = children_check.reset_index()
    for i in range(len(has_parents)):
        parents = [int(p) for p in has_parents.iloc[i].split(" ")]
        for parent in parents:
            row_cond = children_check["universal_id"] == parent
            children_check.loc[row_cond, "children"] = True
    children_check = children_check.set_index(velocities.index.names)

    # Check the linearity of the system
    filepath = output_directory / f"attributes/{object_name}/ellipse.csv"
    ellipse = read_attribute_csv(filepath, columns=["major", "minor"])
    major_check = ellipse["major"] >= analysis_options.min_major_axis_length
    major_check.name = "major_axis"
    axis_ratio = ellipse["major"] / ellipse["minor"]
    axis_ratio_check = axis_ratio >= analysis_options.min_axis_ratio
    axis_ratio_check.name = "axis_ratio"

    names = ["contained", "initially_contained", "velocity", "area"]
    names += ["major_axis", "axis_ratio", "duration", "parents", "children"]
    descriptions = [
        "Is the object sufficiently contained within the domain?",
        "Is the object contained within the domain when first detected?",
        "Is the object velocity sufficiently large?",
        "Is the object area sufficiently large?",
        "Is the object major axis length sufficiently large?",
        "Is the object axis ratio sufficiently large?",
        "Is the object duration sufficiently long?",
        "Does the object have parents?",
        "Does the object have children?",
    ]

    data_type, precision, units, retrieval = bool, None, None, None
    attributes = []
    for name, description in zip(names, descriptions):
        kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type}
        kwargs.update({"precision": precision, "description": description})
        kwargs.update({"units": units})
        attributes.append(Attribute(**kwargs))

    attributes.append(core.time())
    attributes.append(core.record_universal_id())
    attribute_type = AttributeType(name="quality", attributes=attributes)
    filepath = analysis_directory / "quality.csv"
    quality = [overlap_check, initially_contained, velocity_check, area_check]
    quality += [major_check, axis_ratio_check, duration_check]
    quality += [parents_check, children_check]
    quality = pd.concat(quality, axis=1)
    quality = write.attribute.write_csv(filepath, quality, attribute_type)


def smooth_flow_velocities(filepath, output_directory, window_size=6):
    """Smooth the flow velocities."""
    velocities = read_attribute_csv(filepath, columns=["u_flow", "v_flow"])

    velocities = temporal_smooth(velocities, window_size=window_size)
    velocities = velocities.rename(columns={"u_flow": "u", "v_flow": "v"})

    analysis_directory = output_directory / "analysis"

    # Create metadata for the attributes
    names = ["u", "v"]
    descriptions = [
        "System ground relative zonal velocity.",
        "System ground relative meridional velocity.",
    ]

    data_type, precision, units, retrieval = float, 1, "m/s", None
    attributes = []
    for name, description in zip(names, descriptions):
        kwargs = {"name": name, "retrieval": retrieval, "data_type": data_type}
        kwargs.update({"precision": precision, "description": description})
        kwargs.update({"units": units})
        attributes.append(Attribute(**kwargs))
    attributes.append(core.time())
    attributes.append(core.record_universal_id())
    attribute_type = AttributeType(name="velocities", attributes=attributes)
    filepath = analysis_directory / "velocities.csv"
    write.attribute.write_csv(filepath, velocities, attribute_type)


def get_angle(u1, v1, u2, v2):
    """
    Get the angle between two vectors. Angle calculated as second vector direction minus
    first vector direction.
    """

    angle_1 = np.arctan2(v1, u1)
    angle_2 = np.arctan2(v2, u2)
    # Get angle between vectors, but signed so that in range -np.pi to np.pi
    return np.mod(angle_2 - angle_1 + np.pi, 2 * np.pi) - np.pi


[docs] def read_options(output_directory): """Read run options from yml files.""" options_directory = Path(output_directory) / "options" options_filepaths = glob.glob(str(options_directory / "*.yml")) all_options = {} for filepath in options_filepaths: with open(filepath, "r") as file: options = yaml.safe_load(file) name = Path(filepath).stem if name == "track": options = option.track.TrackOptions(**options) if name == "data": options = option.data.DataOptions(**options) if name == "grid": options = option.grid.GridOptions(**options) all_options[name] = options return all_options
def temporal_smooth(df, window_size=6): """ Apply a temporal smoother to each object. """ def smooth_group(group): smoothed = group.rolling(window=window_size, min_periods=1, center=True).mean() return smoothed # Group over all indexes except time, i.e. only smooth over time index indexes_to_group = [idx for idx in df.index.names if idx != "time"] smoothed_df = df.groupby(indexes_to_group, group_keys=False) smoothed_df = smoothed_df.apply(smooth_group) return smoothed_df