.. _clustering-guide: Cluster Configuration & Distributed Processing =============================================== All large-scale AODN processing uses `Coiled `_ — a managed Dask cluster service that auto-scales AWS workers. This guide explains how to choose the right settings for ``batch_size``, ``memory_limit``, ``nthreads``, ``n_workers``, and ``worker_vm_types``. Getting these parameters wrong is the most common cause of slow or failing processing jobs. .. contents:: On this page :local: :depth: 2 ---- The ``batch_size`` Paradox -------------------------- .. important:: **Bigger ``batch_size`` does NOT create more parallelism — it creates less.** Each batch is one Dask task. One task runs on one worker. Coiled auto-scales the number of active workers based on the number of tasks waiting in the scheduler queue. The relationship is: .. code-block:: text n_tasks = total_files / batch_size n_active_workers ≈ min(n_tasks, n_workers_max) If you have 1 000 files and ``batch_size=100``, you get 10 tasks. With 20 workers, at most 10 are ever active at the same time — the other 10 stay idle for the whole job. If you use ``batch_size=10``, you get 100 tasks → all 20 workers stay busy for 5 rounds. **Rule of thumb:** .. code-block:: text batch_size ≈ total_files / (n_workers_max × 5) Targeting 5 rounds of work per worker means all workers stay busy and the job finishes efficiently. Targeting more rounds (10–20) is better for jobs with uneven file sizes. **Real example (AC-S hourly, 42 000 files):** - ``batch_size=50``, ``n_workers_max=20`` → 840 tasks → good utilisation ✓ - ``batch_size=5000``, ``n_workers_max=20`` → 8 tasks → 12 workers idle at all times ✗ .. note:: Each batch runs sequentially inside a single worker: all files in the batch are opened, processed, and written to Zarr one after another. A larger batch does not speed up a single worker; it just makes the scheduler give that worker more work before it can share. ---- The ``memory_limit`` Pitfall ----------------------------- ``memory_limit`` in the ``worker_options`` block tells Dask how much RAM the worker has available. When usage approaches this value, Dask spills data to disk to avoid OOM crashes. **If ``memory_limit`` is set higher than the actual VM RAM, Dask never spills**, because it thinks there is still headroom. This leads to: - Workers running out of real RAM silently - OS swap activity → major slowdowns (100× slower than RAM) - Unexpected OOM kills from the OS (no warning, no retry) The correct value is roughly **87.5 % of actual VM RAM** (leaving ~12.5 % for the OS, Python interpreter, and Dask overhead). .. _vm-ram-table: VM RAM reference table ^^^^^^^^^^^^^^^^^^^^^^ +------------------------+----------+---------------------------+ | VM type | Actual | Recommended | | | RAM | ``memory_limit`` | +========================+==========+===========================+ | ``m7i-flex.large`` | 8 GB | **7 GB** | | ``m7i.large`` | | | +------------------------+----------+---------------------------+ | ``m7i-flex.xlarge`` | 16 GB | **14 GB** | | ``m7i.xlarge`` | | | +------------------------+----------+---------------------------+ | ``m7i-flex.2xlarge`` | 32 GB | **28 GB** | | ``m7i.2xlarge`` | | | +------------------------+----------+---------------------------+ | ``m7i-flex.4xlarge`` | 64 GB | **56 GB** | | ``m7i.4xlarge`` | | | +------------------------+----------+---------------------------+ | ``m7i-flex.8xlarge`` | 128 GB | **112 GB** | | ``m7i.8xlarge`` | | | +------------------------+----------+---------------------------+ **Parquet vs. Zarr memory behaviour:** - **Parquet** (timeseries): all files in a batch are fully loaded into a Pandas DataFrame. Peak memory ≈ ``batch_size × avg_file_size × 3``. Keep ``batch_size`` small enough that the whole batch fits in ``memory_limit``. - **Zarr / xarray** (gridded, spectral): files are opened lazily with ``open_mfdataset``. Only the chunks currently being written are in memory. Peak memory is typically 1–3 chunks, regardless of ``batch_size``. ``batch_size`` affects parallelism, not peak memory for Zarr. ---- ``nthreads`` — Thread Count per Worker --------------------------------------- Each Dask worker can run multiple Python threads. Threads share memory but are subject to Python's Global Interpreter Lock (GIL) for CPU-bound code. General guidance: +-----------------------------------+-------------------+-------------------------------------+ | Workload type | Recommended | Reason | | | ``nthreads`` | | +===================================+===================+=====================================+ | Parquet, small NetCDF files | 4–8 | I/O-bound; threads overlap | | (mooring, vessel, Argo) | | download + decode | +-----------------------------------+-------------------+-------------------------------------+ | Standard Zarr (gridded satellite, | 4–8 | Moderate I/O and compute | | mooring timeseries) | | | +-----------------------------------+-------------------+-------------------------------------+ | Spectral Zarr with regridding | 2 | Heavy SciPy interpolation; more | | (ACS, HyperOCR, DALEC) | | threads cause GIL contention | +-----------------------------------+-------------------+-------------------------------------+ | Ocean colour (large tiled arrays, | 16–32 | Dask chunks processed independently;| | ``satellite_ocean_colour_*``) | | each thread works on one tile | +-----------------------------------+-------------------+-------------------------------------+ .. warning:: Setting ``nthreads`` too high on a small VM multiplies memory pressure. A worker with ``nthreads=32`` and ``memory_limit=14GB`` allocates only 437 MB per thread — easily exceeded by any scientific interpolation step. ---- ``n_workers`` — Worker Count ----------------------------- Set as a list ``[min, max]``. Coiled auto-scales between these limits: - ``min`` controls the number of workers that start immediately (warm-up cost). For short jobs set ``min=1``. For jobs with thousands of tasks, set ``min=5–20`` so the cluster doesn't start cold. - ``max`` is the ceiling. Coiled only runs workers when there are tasks to process. Setting a high ``max`` costs nothing if the task queue is small. .. tip:: Do not set ``max`` below ``n_tasks / 5``. If there are fewer workers than tasks ÷ 5, workers will be busy for too many sequential rounds and the job will run slowly. ---- ``worker_vm_types`` — VM Selection ------------------------------------ Choose the instance type based on the data format and spectral complexity: +----------------------+--------------------------------------+------------------------------------+ | VM type | Best for | Notes | +======================+======================================+====================================+ | ``m7i-flex.large`` | Scalar parquet timeseries | 8 GB RAM; light I/O-only workloads | | (8 GB RAM) | (WQM, EcoTriplet, small vessel data) | | +----------------------+--------------------------------------+------------------------------------+ | ``m7i-flex.xlarge`` | Standard Zarr, moderate NetCDF | 16 GB RAM; most GHRSST datasets, | | (16 GB RAM) | (mooring, glider, radar, GHRSST) | Argo, most mooring collections | +----------------------+--------------------------------------+------------------------------------+ | ``m7i.2xlarge`` | Spectral Zarr, large gridded NetCDF | 32 GB RAM; ACS, HyperOCR, DALEC, | | (32 GB RAM) | (LJCO instruments, satellite L4) | large GHRSST composites | +----------------------+--------------------------------------+------------------------------------+ | ``m7i.4xlarge`` | High-resolution ocean colour, | 64 GB RAM; multi-variable tiled | | (64 GB RAM) | multi-band satellite products | products with large arrays | +----------------------+--------------------------------------+------------------------------------+ Use the ``m7i-flex.*`` variants where possible — they are `Flex` instances that can burst above their baseline CPU for short periods, which helps with decompression and interpolation spikes. ---- Putting It All Together — Configuration Reference -------------------------------------------------- The full Coiled configuration block in a dataset JSON looks like this: .. code-block:: json { "run_settings": { "batch_size": 50, "coiled_cluster_options": { "n_workers": [5, 40], "scheduler_options": {"idle_timeout": "2 hours"}, "worker_vm_types": "m7i-flex.xlarge", "allow_ingress_from": "everyone", "compute_purchase_option": "spot_with_fallback", "worker_options": { "nthreads": 4, "memory_limit": "14GB" } } } } ---- Worked Examples --------------- Argo float profiles (Parquet, ~500 000 files across 11 DACs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Each profile NetCDF is ~1.2 MB. Loaded fully into Pandas (parquet format). .. code-block:: json { "batch_size": 300, "coiled_cluster_options": { "n_workers": [2, 80], "worker_vm_types": "m7i-flex.xlarge", "worker_options": {"nthreads": 8, "memory_limit": "12GB"} } } - ``batch_size=300``: 300 × 1.2 MB × 3× expansion ≈ 1.1 GB per batch → well within 12 GB - ``n_workers_max=80``: 500 000 / 300 ≈ 1 667 tasks → 80 workers → ~21 rounds - ``nthreads=8``: parquet loading is I/O-bound; threads overlap S3 downloads .. note:: Argo processing is inherently long due to the sheer file count. With 1 667 tasks and 80 workers at ~3 min/task, expect ~60 minutes. This is expected behaviour, not a bug. AC-S hourly (Spectral Zarr, 42 000 files, 707-wavelength regridding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Each hourly file is ~4 MB. Heavy SciPy interpolation during preprocessing. .. code-block:: json { "batch_size": 5, "coiled_cluster_options": { "n_workers": [1, 40], "worker_vm_types": "m7i.2xlarge", "worker_options": {"nthreads": 2, "memory_limit": "28GB"} } } - ``batch_size=5``: 42 000 / 5 = 8 400 tasks → 40 workers → 210 rounds, each tiny - ``nthreads=2``: SciPy ``interp`` is CPU-bound; more threads cause GIL contention - ``memory_limit=28GB``: 87.5 % of actual 32 GB RAM → safe Dask spill threshold .. warning:: A previous misconfiguration had ``target_wavelength_grids.step=0.1``, creating 3 531 wavelength points instead of 707. This caused each batch to load ~65 GB of data and the job ran for 12+ hours without completing. Always verify spectral grid sizes match the ``dimensions.chunk`` and ``dimensions.size`` values in the config. GHRSST L3S 1-day (Gridded Zarr, ~15 000 files, 139 MB each) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Large gridded files opened lazily with xarray — peak memory per worker ≈ 1–2 tiles. .. code-block:: json { "batch_size": 60, "coiled_cluster_options": { "n_workers": [35, 120], "worker_vm_types": "m7i-flex.xlarge", "worker_options": {"nthreads": 2, "memory_limit": "14GB"} } } - ``batch_size=60``: 15 000 / 60 = 250 tasks → 120 workers → 2 rounds (fast!) - ``nthreads=2``: xarray zarr writes benefit from 2 threads for I/O overlap - Lazy loading: only active chunks are in memory, so 60 × 139 MB is not loaded at once ---- Troubleshooting --------------- **Processing is very slow despite many workers** Check that ``n_tasks ≫ n_workers_max``. If ``total_files / batch_size < n_workers_max``, most workers are idle. **Reduce** ``batch_size``. **Workers keep crashing with OOM** 1. Check ``memory_limit`` does not exceed actual VM RAM (see :ref:`vm-ram-table`). 2. For Parquet: reduce ``batch_size`` (each batch is fully loaded). 3. For spectral Zarr with ``target_wavelength_grids``: verify the ``step`` produces the expected number of points — a small step creates massive arrays. **Very high CPU but low throughput (GIL contention)** Reduce ``nthreads``. For spectral interpolation workloads, ``nthreads=2`` is usually optimal. Additional threads queue behind the GIL and add overhead without adding throughput. **Job completes quickly but output Zarr store is missing data** The ``n_workers_max`` might have been hit before all tasks were queued. Check Coiled dashboard for task failures. If workers were killed by OOM (no graceful error), reduce ``batch_size`` and/or upgrade ``worker_vm_types``. Additional Resources -------------------- - :ref:`dataset-config-doc` — Full dataset configuration reference including ``coiled_cluster_options`` schema - `Coiled Documentation `_ - `Dask Distributed Documentation `_