Source code for aodn_cloud_optimised.lib.AnmnHourlyTsHandler

from typing import Generator, Tuple

import pandas as pd
import xarray as xr

from .GenericParquetHandler import GenericHandler


[docs] class AnmnHourlyTsHandler(GenericHandler): def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] def preprocess_data( self, netcdf_fp ) -> Generator[Tuple[pd.DataFrame, xr.Dataset], None, None]: """ Preprocess a NetCDF file containing Mooring Hourly timeseries product data. This method reads a NetCDF file, typically used for Mooring Hourly timeseries products, and processes it to yield a tuple of a pandas DataFrame and an xarray Dataset. The DataFrame contains the profile data with instrument information merged based on the 'instrument_index'. This method ensures proper handling of the dataset using a context manager and checks for expected dimensions and variables. :param netcdf_fp: Path to the input NetCDF file, or an open S3 file object (using s3fs). :return: Generator yielding tuples of (DataFrame, Dataset) where DataFrame contains the profile data with instrument information, and Dataset is the corresponding xarray Dataset. """ # Use open_dataset as a context manager to ensure proper handling of the dataset with xr.open_dataset(netcdf_fp, engine="h5netcdf") as ds: # Convert xarray to pandas DataFrame assert set(ds.dims) == { "OBSERVATION", "INSTRUMENT", }, f"Unexpected dimensions {set(ds.dims)}" df = ds.drop_dims("INSTRUMENT").to_dataframe() instrument_info = ds.drop_dims("OBSERVATION").to_dataframe() assert df.shape[1] + instrument_info.shape[1] == len( ds.variables ), "Some variable depends on both dimensions" df = df.join(instrument_info, on="instrument_index") assert df.shape[1] == len( ds.variables ), "Something went wrong with the join" # Decode strings from bytes for col, dtype in df.dtypes.items(): if dtype == object: df[col] = df[col].astype(str) yield df, ds