Skip to content

Merge Rules#

How Merging Works#

graph TD
    A[Source 1: defaults.yaml] --> D[Load & Parse]
    B[Source 2: overrides.yaml] --> E[Load & Parse]
    C[Source 3: ENV vars] --> F[Load & Parse]
    D --> G[Raw Dict 1]
    E --> H[Raw Dict 2]
    F --> I[Raw Dict 3]
    G --> J{Merge Strategy}
    H --> J
    I --> J
    J --> K[Merged Dict]
    K --> L[Type Conversion]
    L --> M[Validation]
    M --> N[Dataclass Instance]

Per-Field Merge Strategies#

Override the global strategy for individual fields using field_merges. Each value can be one of the built-in strategy names below, or any callable or custom class implementing FieldMergeStrategy.

Available field merge strategies:

Strategy Behavior
"first_wins" Keep the value from the first source
"last_wins" Keep the value from the last source
"append" Concatenate lists: base + override
"append_unique" Concatenate lists, removing duplicates
"prepend" Concatenate lists: override + base
"prepend_unique" Concatenate lists in reverse order, removing duplicates

Given two sources with overlapping tags:

tags:
  - "web"
  - "default"
tags:
  - "web"
  - "api"

Each strategy produces a different result:

"""Per-field merge — FIRST_WINS keeps tags from the first source."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "first_wins"},
)

assert config.tags == ["web", "default"]
assert config.tags == ["web", "default"]
"""Per-field merge — LAST_WINS keeps tags from the last source."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "last_wins"},
)

assert config.tags == ["web", "api"]
assert config.tags == ["web", "api"]
"""Per-field merge — APPEND concatenates lists from all sources."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "append"},
)

assert config.tags == ["web", "default", "web", "api"]
assert config.tags == ["web", "default", "web", "api"]
"""Per-field merge — APPEND_UNIQUE concatenates lists, removing duplicates."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "append_unique"},
)

assert config.tags == ["web", "default", "api"]
assert config.tags == ["web", "default", "api"]
"""Per-field merge — PREPEND puts override list before base list."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "prepend"},
)

assert config.tags == ["web", "api", "web", "default"]
assert config.tags == ["web", "api", "web", "default"]
"""Per-field merge — PREPEND_UNIQUE prepends override, drops duplicates."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
    dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: "prepend_unique"},
)

assert config.tags == ["web", "api", "default"]
assert config.tags == ["web", "api", "default"]

Nested fields are supported: dature.F[Config].database.host.

With raise_on_conflict#

Fields with an explicit strategy are excluded from conflict detection:

"""RAISE_ON_CONFLICT with per-field override."""

from dataclasses import dataclass
from pathlib import Path

import dature

SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
    dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
    schema=Config,
    strategy="raise_on_conflict",
    field_merges={
        dature.F[Config].host: "last_wins",
        dature.F[Config].port: "last_wins",
        dature.F[Config].tags: "append_unique",
    },
)

assert config.host == "production.example.com"
assert config.port == 8080
assert config.tags == ["default", "web", "api"]
assert config.tags == ["default", "web", "api"]
assert config.tags == ["default", "web", "api"]
host: "localhost"
port: 3000
tags:
  - "default"
host: "production.example.com"
port: 8080
tags:
  - "web"
  - "api"

Custom Field Strategy#

The FieldMergeStrategy Protocol#

Any callable that takes a list[JSONValue] (one value per source) and returns the merged value satisfies the public FieldMergeStrategy Protocol:

@runtime_checkable
class FieldMergeStrategy(Protocol):
    def __call__(self, values: list[JSONValue]) -> JSONValue: ...

The built-in field strategies are also exposed as classes from dature.strategies.field: FieldFirstWins, FieldLastWins, FieldAppend, FieldAppendUnique, FieldPrepend, FieldPrependUnique. They satisfy the same Protocol, so you can pass them directly to field_merges or compose them inside your own strategy.

Examples#

Pick a plain function for one-off logic, or a class for a named, reusable reducer:

"""Callable merge — custom merge function for a field."""

from dataclasses import dataclass
from pathlib import Path
from typing import Any

import dature

SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    tags: list[str]


def merge_tags(values: list[Any]) -> list[str]:
    return sorted({v for lst in values for v in lst})


config = dature.load(
    dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
    dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
    schema=Config,
    strategy="last_wins",
    field_merges={dature.F[Config].tags: merge_tags},
)

assert config.host == "production.example.com"
assert config.port == 8080
assert config.tags == ["api", "default", "web"]
"""Custom field strategy — a class implementing FieldMergeStrategy."""

from dataclasses import dataclass
from pathlib import Path
from typing import cast

import dature
from dature.strategies.field import FieldMergeStrategy
from dature.types import JSONValue

SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    tags: list[str]


class SortedUnion:
    """Concatenate lists across all sources, deduplicate, sort."""

    def __call__(self, values: list[JSONValue]) -> JSONValue:
        merged: set[str] = set()
        for chunk in values:
            if isinstance(chunk, list):
                merged.update(str(v) for v in chunk)
        return cast("JSONValue", sorted(merged))


# Type-check that the class satisfies the public Protocol.
strategy: FieldMergeStrategy = SortedUnion()

config = dature.load(
    dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
    dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
    schema=Config,
    field_merges={dature.F[Config].tags: strategy},
)

assert config.tags == ["api", "default", "web"]
host: "localhost"
port: 3000
tags:
  - "default"
host: "production.example.com"
port: 8080
tags:
  - "web"
  - "api"

Custom Source Strategy#

The global strategy parameter accepts not only the names from Merge Strategies but also any object implementing the public SourceMergeStrategy Protocol:

class SourceMergeStrategy(Protocol):
    def __call__(self, sources: Sequence[Source], ctx: LoadCtx) -> JSONValue: ...

The strategy receives the raw Source instances (not pre-loaded data) and a LoadCtx helper. The primary API for applying a source to the running base is ctx.merge(source=src, base=base, op=...) — it loads the source (cached), runs the merge op (default deep_merge_last_wins), and registers the step so debug logs and LoadReport.field_origins are populated correctly. A minimal custom strategy is one loop:

class MyCustom:
    def __call__(self, sources, ctx):
        base = {}
        for src in sources:
            base = ctx.merge(source=src, base=base)
        return base

Override op to plug in your own merge function — e.g. shallow overlay for env on top of files:

"""Custom source strategy — files merge `last_wins`, env overrides on top."""

from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path

import dature
from dature.strategies import LoadCtx, SourceMergeStrategy
from dature.types import JSONValue

SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    tags: list[str]


def _dict_overlay(a: JSONValue, b: JSONValue) -> JSONValue:
    """Shallow overlay: top-level keys of b replace those of a."""
    return {**a, **b} if isinstance(a, dict) and isinstance(b, dict) else b


class EnvOverrides:
    """Files merge `last_wins`; env sources overlay shallowly on top."""

    def __call__(
        self,
        sources: Sequence[dature.Source],
        ctx: LoadCtx,
    ) -> JSONValue:
        base: JSONValue = {}
        for idx, s in enumerate(sources):
            if isinstance(s, dature.EnvSource):
                base = ctx.merge(source_idx=idx, base=base, op=_dict_overlay)
            else:
                base = ctx.merge(source_idx=idx, base=base)
        return base


# Type-check that the class satisfies the public Protocol.
strategy: SourceMergeStrategy = EnvOverrides()

config = dature.load(
    dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
    dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
    schema=Config,
    strategy=strategy,
)

# `last_wins` between two file sources — the override file wins.
assert config.host == "production.example.com"
assert config.port == 8080
host: "localhost"
port: 3000
tags:
  - "default"
host: "production.example.com"
port: 8080
tags:
  - "web"
  - "api"

isinstance(src, EnvSource) (or any other concrete Source subclass) lets the strategy dispatch on source type — useful when env variables should override file content rather than merge with it. Pass skip_on_error=True to ctx.merge(...) (or ctx.load(...)) if you want broken sources to be skipped silently regardless of skip_if_broken (this is what SourceFirstFound does internally).

ctx.merge is the single hook — once your strategy funnels every per-source step through it, debug logs ([Cls] Merge step N ..., State after step N: ...) and LoadReport.field_origins are populated automatically; there's no separate registration call to remember.

Skipping Broken Sources#

Skip sources that fail to load (missing file, invalid syntax):

"""skip_broken_sources — continue loading when a source is missing."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"
SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    debug: bool = False


config = dature.load(
    dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
    dature.Yaml12Source(
        file=SOURCES_DIR / "nonexistent.yaml",
        skip_if_broken=True,
    ),
    schema=Config,
)

assert config.host == "localhost"
assert config.port == 3000
assert config.debug is False
host: "localhost"
port: 3000
tags:
  - "default"

Override per source with skip_if_broken on Source (takes priority over the global flag):

"""skip_if_broken per source — override the global flag per Source."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"
SHARED_DIR = Path(__file__).parents[2] / "shared"


@dataclass
class Config:
    host: str
    port: int
    tags: list[str]


config = dature.load(
    dature.Yaml12Source(
        file=SHARED_DIR / "common_defaults.yaml",
    ),  # uses global
    dature.Yaml12Source(
        file=SOURCES_DIR / "optional.yaml",
        skip_if_broken=True,
    ),  # always skip if broken
    dature.Yaml12Source(
        file=SHARED_DIR / "common_overrides.yaml",
        skip_if_broken=False,
    ),  # never skip, even if global is True
    schema=Config,
    skip_broken_sources=True,  # global default
)

assert config.host == "production.example.com"
assert config.port == 8080
host: "localhost"
port: 3000
tags:
  - "default"

If all sources fail to load, a ValueError is raised.

Skipping Invalid Fields#

Drop fields with invalid values and let other sources or defaults fill them in:

"""skip_invalid_fields — drop invalid fields, let defaults fill in."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    host: str
    port: int = 3000


config = dature.load(
    dature.Yaml12Source(
        file=SOURCES_DIR / "merging_skip_invalid_defaults.yaml",
        skip_field_if_invalid=True,
    ),
    schema=Config,
)

assert config.host == "localhost"
assert config.port == 3000
host: "localhost"
port: "not_a_number"

Restrict skipping to specific fields:

"""skip_field_if_invalid per field — restrict skipping to specific fields."""

from dataclasses import dataclass
from pathlib import Path

import dature

SOURCES_DIR = Path(__file__).parent / "sources"


@dataclass
class Config:
    host: str
    port: int
    timeout: int


config = dature.load(
    dature.Yaml12Source(
        file=SOURCES_DIR / "merging_skip_invalid_per_field_defaults.yaml",
    ),
    dature.Yaml12Source(
        file=SOURCES_DIR / "merging_skip_invalid_per_field_overrides.yaml",
        skip_field_if_invalid=(dature.F[Config].port, dature.F[Config].timeout),
    ),
    schema=Config,
)

assert config.host == "production.example.com"
assert config.port == 3000
assert config.timeout == 30
host: "localhost"
port: 3000
timeout: 30
host: "production.example.com"
port: "not_a_number"
timeout: "invalid"

Only port and timeout will be skipped if invalid; other fields still raise errors.

If a required field is invalid in all sources and has no default:

Config loading errors (1)

  [port]  Missing required field (invalid in: yaml 'defaults.yaml', yaml 'overrides.yaml')
   └── FILE 'defaults.yaml', line 3
       port: "not_a_number"
   └── FILE 'overrides.yaml', line 2
       port: "not_a_number_too"