Source code for aodn_cloud_optimised.bin.generic_cloud_optimised_creation

#!/usr/bin/env python3

"""
Runner to generate cloud-optimised datasets from S3 files, based on a dataset config JSON.

Example usage:
  generic_cloud_optimised_creation --config mooring_hourly_timeseries_delayed_qc
  generic_cloud_optimised_creation --config satellite_chlorophylla_gsm_1day_aqua --json-overwrite '{"run_settings": {"cluster": {"mode": null}, "raise_error": true}, "clear_existing_data": false}'  # useful for single file processing on prefect
  generic_cloud_optimised_creation --config satellite_chlorophylla_gsm_1day_aqua --json-overwrite '{"run_settings": {"cluster": {"mode": null}, "raise_error": true},  "clear_existing_data": false, "force_previous_parquet_deletion": true }' # useful for parquet dataset to overwrite existing matching input files already processed
"""

import argparse
import json
import logging
import re
import sys
import warnings
from pathlib import Path, PurePosixPath
from typing import Any, List, Optional
from urllib.parse import urlparse

from pydantic import ValidationError

from aodn_cloud_optimised.lib import clusterLib
from aodn_cloud_optimised.lib.CommonHandler import cloud_optimised_creation
from aodn_cloud_optimised.lib.config import (
    load_dataset_config,
    load_variable_from_config,
)
from aodn_cloud_optimised.lib.run_summary import RunSummary
from aodn_cloud_optimised.lib.s3Tools import boto3_from_opts_dict, s3_ls

logger = logging.getLogger(__name__)
from aodn_cloud_optimised.bin.config.model import DatasetConfig, PathConfig


[docs] def load_config_and_validate(config_filename: str) -> DatasetConfig: """Load and validate a dataset configuration. This function loads a dataset configuration file and validates it against the `DatasetConfig` Pydantic model. If `config_filename` is a full path to an existing file, it is used directly. Otherwise, it is assumed to be a filename located in the default config directory: `../config/dataset/` relative to this file. Args: config_filename: The name of the configuration file or a full path to one. Returns: A validated `DatasetConfig` instance. Raises: FileNotFoundError: If the configuration file does not exist. pydantic.ValidationError: If the configuration is invalid. """ config_path = resolve_dataset_config_path(config_filename) dataset_config = load_dataset_config(str(config_path)) return DatasetConfig.model_validate(dataset_config)
[docs] def collect_files( path_cfg: PathConfig, suffix: Optional[str], exclude: Optional[str], bucket_raw: Optional[str], s3_client_opts: Optional[dict] = None, ) -> List[str]: """Collect dataset paths from S3 based on dataset type. Supports: - 'files': lists and filters regular files (e.g., NetCDF, CSV) - 'parquet': handles both single Parquet files and Hive-partitioned datasets - 'zarr': returns the Zarr store path directly Args: path_cfg: Configuration object including type, S3 URI, and optional regex filters. suffix: File suffix to filter by, e.g., '.nc'. Set to None to disable suffix filtering. exclude: Optional regex string to exclude files. bucket_raw: Required if `path_cfg.s3_uri` is not a full S3 URI. s3_client_opts: Optional dict with boto3 S3 client options. Returns: List of dataset paths (files or root URIs) as strings. """ dataset_type = getattr(path_cfg, "type", "files") # default value s3_uri = path_cfg.s3_uri.rstrip("/") # --------------------------------------------------------------------- # Handle 'file' collection (NetCDF, CSV) # --------------------------------------------------------------------- if dataset_type == "files": if s3_uri.startswith("s3://"): parsed = urlparse(s3_uri) bucket = parsed.netloc prefix = parsed.path.lstrip("/") else: if not bucket_raw: raise ValueError( "bucket_raw must be provided when s3_uri is not a full S3 URI." ) bucket = bucket_raw prefix = s3_uri prefix = str(PurePosixPath(prefix)) # normalise path matching_files = s3_ls( bucket, prefix, suffix=None, exclude=exclude, s3_client_opts=s3_client_opts, ) if path_cfg.filter: pattern = path_cfg.filter logger.info(f"Filtering files with regex pattern: {pattern}") regex = re.compile(pattern) matching_files = [f for f in matching_files if regex.search(f)] if not matching_files: logger.warning( f"No files matching {pattern} under {s3_uri}. Skipping this path." ) return [] logger.info(f"Matched {len(matching_files)} files") return matching_files # --------------------------------------------------------------------- # Handle 'parquet' (single Parquet file or Hive-partitioned dataset) # --------------------------------------------------------------------- elif dataset_type == "parquet": # No filters return [s3_uri] # --------------------------------------------------------------------- # Handle 'zarr' (Zarr store) # --------------------------------------------------------------------- elif dataset_type == "zarr": raise ValueError("zarr store as an input dataset is not yet implemented") # return [s3_uri] # Unsupported type else: raise ValueError(f"Unsupported dataset type: {dataset_type}")
[docs] def json_update(base: dict, updates: dict) -> dict: """Recursively update nested dictionaries.""" for k, v in updates.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): base[k] = json_update(base[k], v) else: base[k] = v return base
[docs] def join_s3_uri(base_uri: str, *parts: str) -> str: if base_uri.startswith("s3://"): parsed = urlparse(base_uri) bucket = parsed.netloc key = PurePosixPath(parsed.path.lstrip("/"), *parts) return f"s3://{bucket}/{key}" else: return str(PurePosixPath(base_uri, *parts))
[docs] def resolve_dataset_config_path(config_arg: str) -> str: """Resolve dataset config path from a given argument. If `config_arg` is an existing file path, return it as-is. Otherwise, treat it as a base name (with or without `.json`) and look it up in `aodn_cloud_optimised.config.dataset`. Args: config_arg: The CLI config argument, either a path or a name. Returns: The path to the config file as a string. Raises: FileNotFoundError: If the resolved config file does not exist. """ config_path = Path(config_arg) if not config_path.is_file(): # Fall back to the default relative config path # Ensure .json extension if missing if not config_arg.endswith(".json"): config_arg += ".json" config_path = Path(__file__).parents[1] / "config" / "dataset" / config_arg if not config_path.is_file(): raise FileNotFoundError(f"Configuration file not found: {config_path}") return str(config_path)
[docs] def main(): parser = argparse.ArgumentParser( description="Run cloud-optimised creation using config." ) parser.add_argument( "-c", "--config", required=False, help="JSON filename in config/dataset/" ) parser.add_argument( "-o", "--json-overwrite", type=str, help='JSON string to override config fields. Example: \'{"run_settings": {"cluster": {"mode": null}, "raise_error": true}}\' ', ) parser.add_argument( "-t", "--test", action="store_true", help="Use integration testing bucket instead of the default optimised bucket.", ) args = parser.parse_args() try: config = load_config_and_validate(args.config) except ValidationError as e: print(f"❌ Validation error in config file:\n{e}") # noqa: T201 sys.exit(1) overwrite_dict: dict[str, Any] = {} if args.json_overwrite: overwrite = args.json_overwrite if overwrite.startswith("@"): with open(overwrite[1:], "r") as f: overwrite_dict = json.load(f) else: overwrite_dict = json.loads(overwrite) config_updated = json_update(config.model_dump(), overwrite_dict) config = DatasetConfig.model_validate(config_updated) # Set bucket values s3_bucket_opts = config.run_settings.s3_bucket_opts or {} bucket_raw = ( s3_bucket_opts.get("input_data", {}).get("bucket") or config.run_settings.bucket_raw_default_name or load_variable_from_config("BUCKET_RAW_DEFAULT") ) bucket_optimised = ( s3_bucket_opts.get("output_data", {}).get("bucket") or config.run_settings.optimised_bucket_name or load_variable_from_config("BUCKET_OPTIMISED_DEFAULT") ) root_prefix = ( config.run_settings.root_prefix_cloud_optimised_path or load_variable_from_config("ROOT_PREFIX_CLOUD_OPTIMISED_PATH") ) # Override for test mode if args.test: bucket_optimised = load_variable_from_config( "BUCKET_INTEGRATION_TESTING_OPTIMISED_DEFAULT" ) root_prefix = load_variable_from_config( "ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH" ) s3_fs_common_opts = config.run_settings.s3_fs_common_opts s3_client_opts = boto3_from_opts_dict(s3_fs_common_opts) # overwrite the above with specific options if set for the input bucket if ( config.run_settings.s3_bucket_opts and "input_data" in config.run_settings.s3_bucket_opts and config.run_settings.s3_bucket_opts["input_data"].get("s3_fs_opts") ): s3_fs_opts_input = config.run_settings.s3_bucket_opts["input_data"][ "s3_fs_opts" ] s3_client_opts = boto3_from_opts_dict(s3_fs_opts_input) dataset_config_path = resolve_dataset_config_path(args.config) dataset_config = load_dataset_config( dataset_config_path ) # not using config.model_dump() as it retains only the validated objects. run_summary = RunSummary() # If restart_every_path is True, run one cloud_optimised_creation per path/year if config.run_settings.cluster.restart_every_path: clear_flag_added = False for path_cfg in config.run_settings.paths: year_values = path_cfg.year_range or [None] if year_values == [None]: matching_files = collect_files( path_cfg, suffix=config.run_settings.suffix, exclude=config.run_settings.exclude, bucket_raw=bucket_raw, s3_client_opts=s3_client_opts, ) if not matching_files: warnings.warn(f"No files matched for path {path_cfg.s3_uri}") continue result = cloud_optimised_creation( matching_files, dataset_config=dataset_config, run_summary=run_summary, handler_class=None, clear_existing_data=( config.run_settings.clear_existing_data and not clear_flag_added ), force_previous_parquet_deletion=config.run_settings.force_previous_parquet_deletion, cluster_mode=config.run_settings.cluster.mode or clusterLib.ClusterMode.NONE.value, optimised_bucket_name=bucket_optimised, root_prefix_cloud_optimised_path=root_prefix, raise_error=config.run_settings.raise_error, ) if config.run_settings.clear_existing_data: clear_flag_added = True if not result and config.run_settings.raise_error: sys.exit(1) else: for year in year_values: s3_uri = ( join_s3_uri(path_cfg.s3_uri, str(year)) if year else path_cfg.s3_uri ) path_cfg_year = path_cfg.model_copy() path_cfg_year.s3_uri = s3_uri matching_files = collect_files( path_cfg_year, suffix=config.run_settings.suffix, exclude=config.run_settings.exclude, bucket_raw=bucket_raw, s3_client_opts=s3_client_opts, ) if not matching_files: warnings.warn(f"No files matched for path {s3_uri}") continue result = cloud_optimised_creation( matching_files, dataset_config=dataset_config, run_summary=run_summary, handler_class=None, clear_existing_data=( config.run_settings.clear_existing_data and not clear_flag_added ), force_previous_parquet_deletion=config.run_settings.force_previous_parquet_deletion, cluster_mode=config.run_settings.cluster.mode or clusterLib.ClusterMode.NONE.value, optimised_bucket_name=bucket_optimised, root_prefix_cloud_optimised_path=root_prefix, raise_error=config.run_settings.raise_error, ) if config.run_settings.clear_existing_data: clear_flag_added = True if not result and config.run_settings.raise_error: sys.exit(1) else: # Collect all files from all paths / years and run once all_files = [] for path_cfg in config.run_settings.paths: year_values = path_cfg.year_range or [None] if year_values == [None]: files_found = collect_files( path_cfg, suffix=config.run_settings.suffix, exclude=config.run_settings.exclude, bucket_raw=bucket_raw, s3_client_opts=s3_client_opts, ) all_files.extend(files_found) else: for year in year_values: s3_uri = ( join_s3_uri(path_cfg.s3_uri, str(year)) if year else path_cfg.s3_uri ) path_cfg_year = path_cfg.model_copy() path_cfg_year.s3_uri = s3_uri files_found = collect_files( path_cfg_year, suffix=config.run_settings.suffix, exclude=config.run_settings.exclude, bucket_raw=bucket_raw, s3_client_opts=s3_client_opts, ) all_files.extend(files_found) all_files = list(dict.fromkeys(all_files)) # Deduplicate if not all_files: warnings.warn("No files matched any path/filter combination.") sys.exit(1) result = cloud_optimised_creation( all_files, dataset_config=dataset_config, run_summary=run_summary, handler_class=None, clear_existing_data=config.run_settings.clear_existing_data, force_previous_parquet_deletion=config.run_settings.force_previous_parquet_deletion, cluster_mode=config.run_settings.cluster.mode or clusterLib.ClusterMode.NONE.value, optimised_bucket_name=bucket_optimised, root_prefix_cloud_optimised_path=root_prefix, raise_error=config.run_settings.raise_error, ) if not result and config.run_settings.raise_error: sys.exit(1) logger_name = dataset_config.get("logger_name", args.config or "generic") dataset_name = dataset_config.get("dataset_name", args.config or "") run_summary.render(logger_name, dataset_name) sys.exit(0)
if __name__ == "__main__": main()