from typing import Generator, Tuple
import numpy as np
import pandas as pd
import xarray as xr
from .GenericParquetHandler import GenericHandler
[docs]
class ArgoHandler(GenericHandler):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# TODO: rename JULD variable to TIME? or just copy it so that it's more consistent with other dataset?
[docs]
def preprocess_data(
self, fp
) -> Generator[Tuple[pd.DataFrame, xr.Dataset], None, None]:
"""
Preprocess a NetCDF file containing aggregated profile data.
This method reads a profile NetCDF file (typically named with a *_prof.nc suffix),
which is an aggregation of multiple profile files, and returns a generator
yielding a tuple of a pandas DataFrame and an xarray Dataset.
:param fp: Path to the input NetCDF file, or an open S3 file object (using s3fs) of an Argo *_prof.nc file.
:return: Generator yielding tuples of (DataFrame, Dataset) where DataFrame contains the profile data
and Dataset is the corresponding xarray Dataset.
"""
if not fp.path.endswith("_prof.nc"):
raise ValueError
with xr.open_dataset(fp, engine="scipy") as ds:
# create dataframe
prof_variables = []
param_variables = []
date_info_variables = []
prof_info_variables = []
df_profile_data = pd.DataFrame()
n_profiles = ds["PRES"].shape[1]
for varname in ds.keys():
# find profile variables
if len(ds[varname].dims) == 2:
# condition on variables containing profile data (PSAL, PRES ...)
if (
ds[varname].dims[0] == "N_PROF"
and ds[varname].dims[1] == "N_LEVELS"
):
prof_variables.append(varname)
temporary_df = ds[varname].values.reshape(ds[varname].size)
df_profile_data[varname] = temporary_df
elif (
ds[varname].dims[0] == "N_PROF"
and ds[varname].dims[1] == "N_PARAM"
):
param_variables.append(varname) # this is not used
if len(ds[varname].dims) == 1:
if ds[varname].dims[0] == "DATE_TIME":
date_info_variables.append(varname) # this is not used
# condition on variables containing profile metadata (CYCLE_NUMBER, PLATFORM NUMBER ...)
# data is repeated to match profile data
elif ds[varname].dims[0] == "N_PROF":
prof_info_variables.append(varname)
repeat_array = np.transpose([ds[varname].values] * n_profiles)
temporary_df = repeat_array.reshape(repeat_array.size)
df_profile_data[varname] = temporary_df
# read variable attributes
if varname in df_profile_data:
df_profile_data[varname].attrs = ds[varname].attrs
df_profile_data["PLATFORM_NUMBER"] = df_profile_data[
"PLATFORM_NUMBER"
].apply(
lambda x: int(x.decode("UTF-8").strip()) if isinstance(x, bytes) else x
)
# TODO: DONT DO the FOLLOWING!!! for meop data, JULD is an object (opened with scipy) and object data are converted back to string! making NAN for time stuff.
# commenting it shouldnt' break anything for ARGO, but to check!
# df_profile_data = self.convert_df_bytes_to_str(df_profile_data)
gatts = ds.attrs
df_profile_data.attrs = gatts # we store the gatts of ds to pandas
# since we modified the dataframe, let's put it back into the xarray dataset
ds = df_profile_data.to_xarray()
# lets restore the attributes as to_xarray is too dumb to keep them!
ds.attrs.update(df_profile_data.attrs)
var_attrs = {
col: df_profile_data[col].attrs for col in df_profile_data.columns
}
ds.attrs.update(df_profile_data.attrs)
for var, attrs in var_attrs.items():
ds[var].attrs.update(attrs)
yield df_profile_data, ds