Merge Rules#
How Merging Works#
graph TD
A[Source 1: defaults.yaml] --> D[Load & Parse]
B[Source 2: overrides.yaml] --> E[Load & Parse]
C[Source 3: ENV vars] --> F[Load & Parse]
D --> G[Raw Dict 1]
E --> H[Raw Dict 2]
F --> I[Raw Dict 3]
G --> J{Merge Strategy}
H --> J
I --> J
J --> K[Merged Dict]
K --> L[Type Conversion]
L --> M[Validation]
M --> N[Dataclass Instance] Per-Field Merge Strategies#
Override the global strategy for individual fields using field_merges. Each value can be one of the built-in strategy names below, or any callable or custom class implementing FieldMergeStrategy.
Available field merge strategies:
| Strategy | Behavior |
|---|---|
"first_wins" | Keep the value from the first source |
"last_wins" | Keep the value from the last source |
"append" | Concatenate lists: base + override |
"append_unique" | Concatenate lists, removing duplicates |
"prepend" | Concatenate lists: override + base |
"prepend_unique" | Concatenate lists in reverse order, removing duplicates |
Given two sources with overlapping tags:
Each strategy produces a different result:
"""Per-field merge — FIRST_WINS keeps tags from the first source."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "first_wins"},
)
assert config.tags == ["web", "default"]
assert config.tags == ["web", "default"]
"""Per-field merge — LAST_WINS keeps tags from the last source."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "last_wins"},
)
assert config.tags == ["web", "api"]
assert config.tags == ["web", "api"]
"""Per-field merge — APPEND concatenates lists from all sources."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "append"},
)
assert config.tags == ["web", "default", "web", "api"]
assert config.tags == ["web", "default", "web", "api"]
"""Per-field merge — APPEND_UNIQUE concatenates lists, removing duplicates."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "append_unique"},
)
assert config.tags == ["web", "default", "api"]
assert config.tags == ["web", "default", "api"]
"""Per-field merge — PREPEND puts override list before base list."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "prepend"},
)
assert config.tags == ["web", "api", "web", "default"]
assert config.tags == ["web", "api", "web", "default"]
"""Per-field merge — PREPEND_UNIQUE prepends override, drops duplicates."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_base.yaml"),
dature.Yaml12Source(file=SOURCES_DIR / "merging_field_override.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: "prepend_unique"},
)
assert config.tags == ["web", "api", "default"]
assert config.tags == ["web", "api", "default"]
Nested fields are supported: dature.F[Config].database.host.
With raise_on_conflict#
Fields with an explicit strategy are excluded from conflict detection:
"""RAISE_ON_CONFLICT with per-field override."""
from dataclasses import dataclass
from pathlib import Path
import dature
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
tags: list[str]
config = dature.load(
dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
schema=Config,
strategy="raise_on_conflict",
field_merges={
dature.F[Config].host: "last_wins",
dature.F[Config].port: "last_wins",
dature.F[Config].tags: "append_unique",
},
)
assert config.host == "production.example.com"
assert config.port == 8080
assert config.tags == ["default", "web", "api"]
assert config.tags == ["default", "web", "api"]
assert config.tags == ["default", "web", "api"]
Custom Field Strategy#
The FieldMergeStrategy Protocol#
Any callable that takes a list[JSONValue] (one value per source) and returns the merged value satisfies the public FieldMergeStrategy Protocol:
@runtime_checkable
class FieldMergeStrategy(Protocol):
def __call__(self, values: list[JSONValue]) -> JSONValue: ...
The built-in field strategies are also exposed as classes from dature.strategies.field: FieldFirstWins, FieldLastWins, FieldAppend, FieldAppendUnique, FieldPrepend, FieldPrependUnique. They satisfy the same Protocol, so you can pass them directly to field_merges or compose them inside your own strategy.
Examples#
Pick a plain function for one-off logic, or a class for a named, reusable reducer:
"""Callable merge — custom merge function for a field."""
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dature
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
tags: list[str]
def merge_tags(values: list[Any]) -> list[str]:
return sorted({v for lst in values for v in lst})
config = dature.load(
dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
schema=Config,
strategy="last_wins",
field_merges={dature.F[Config].tags: merge_tags},
)
assert config.host == "production.example.com"
assert config.port == 8080
assert config.tags == ["api", "default", "web"]
"""Custom field strategy — a class implementing FieldMergeStrategy."""
from dataclasses import dataclass
from pathlib import Path
from typing import cast
import dature
from dature.strategies.field import FieldMergeStrategy
from dature.types import JSONValue
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
tags: list[str]
class SortedUnion:
"""Concatenate lists across all sources, deduplicate, sort."""
def __call__(self, values: list[JSONValue]) -> JSONValue:
merged: set[str] = set()
for chunk in values:
if isinstance(chunk, list):
merged.update(str(v) for v in chunk)
return cast("JSONValue", sorted(merged))
# Type-check that the class satisfies the public Protocol.
strategy: FieldMergeStrategy = SortedUnion()
config = dature.load(
dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
schema=Config,
field_merges={dature.F[Config].tags: strategy},
)
assert config.tags == ["api", "default", "web"]
Custom Source Strategy#
The global strategy parameter accepts not only the names from Merge Strategies but also any object implementing the public SourceMergeStrategy Protocol:
class SourceMergeStrategy(Protocol):
def __call__(self, sources: Sequence[Source], ctx: LoadCtx) -> JSONValue: ...
The strategy receives the raw Source instances (not pre-loaded data) and a LoadCtx helper. The primary API for applying a source to the running base is ctx.merge(source=src, base=base, op=...) — it loads the source (cached), runs the merge op (default deep_merge_last_wins), and registers the step so debug logs and LoadReport.field_origins are populated correctly. A minimal custom strategy is one loop:
class MyCustom:
def __call__(self, sources, ctx):
base = {}
for src in sources:
base = ctx.merge(source=src, base=base)
return base
Override op to plug in your own merge function — e.g. shallow overlay for env on top of files:
"""Custom source strategy — files merge `last_wins`, env overrides on top."""
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
import dature
from dature.strategies import LoadCtx, SourceMergeStrategy
from dature.types import JSONValue
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
tags: list[str]
def _dict_overlay(a: JSONValue, b: JSONValue) -> JSONValue:
"""Shallow overlay: top-level keys of b replace those of a."""
return {**a, **b} if isinstance(a, dict) and isinstance(b, dict) else b
class EnvOverrides:
"""Files merge `last_wins`; env sources overlay shallowly on top."""
def __call__(
self,
sources: Sequence[dature.Source],
ctx: LoadCtx,
) -> JSONValue:
base: JSONValue = {}
for idx, s in enumerate(sources):
if isinstance(s, dature.EnvSource):
base = ctx.merge(source_idx=idx, base=base, op=_dict_overlay)
else:
base = ctx.merge(source_idx=idx, base=base)
return base
# Type-check that the class satisfies the public Protocol.
strategy: SourceMergeStrategy = EnvOverrides()
config = dature.load(
dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
dature.Yaml12Source(file=SHARED_DIR / "common_overrides.yaml"),
schema=Config,
strategy=strategy,
)
# `last_wins` between two file sources — the override file wins.
assert config.host == "production.example.com"
assert config.port == 8080
isinstance(src, EnvSource) (or any other concrete Source subclass) lets the strategy dispatch on source type — useful when env variables should override file content rather than merge with it. Pass skip_on_error=True to ctx.merge(...) (or ctx.load(...)) if you want broken sources to be skipped silently regardless of skip_if_broken (this is what SourceFirstFound does internally).
ctx.merge is the single hook — once your strategy funnels every per-source step through it, debug logs ([Cls] Merge step N ..., State after step N: ...) and LoadReport.field_origins are populated automatically; there's no separate registration call to remember.
Skipping Broken Sources#
Skip sources that fail to load (missing file, invalid syntax):
"""skip_broken_sources — continue loading when a source is missing."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
debug: bool = False
config = dature.load(
dature.Yaml12Source(file=SHARED_DIR / "common_defaults.yaml"),
dature.Yaml12Source(
file=SOURCES_DIR / "nonexistent.yaml",
skip_if_broken=True,
),
schema=Config,
)
assert config.host == "localhost"
assert config.port == 3000
assert config.debug is False
Override per source with skip_if_broken on Source (takes priority over the global flag):
"""skip_if_broken per source — override the global flag per Source."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
SHARED_DIR = Path(__file__).parents[2] / "shared"
@dataclass
class Config:
host: str
port: int
tags: list[str]
config = dature.load(
dature.Yaml12Source(
file=SHARED_DIR / "common_defaults.yaml",
), # uses global
dature.Yaml12Source(
file=SOURCES_DIR / "optional.yaml",
skip_if_broken=True,
), # always skip if broken
dature.Yaml12Source(
file=SHARED_DIR / "common_overrides.yaml",
skip_if_broken=False,
), # never skip, even if global is True
schema=Config,
skip_broken_sources=True, # global default
)
assert config.host == "production.example.com"
assert config.port == 8080
If all sources fail to load, a ValueError is raised.
Skipping Invalid Fields#
Drop fields with invalid values and let other sources or defaults fill them in:
"""skip_invalid_fields — drop invalid fields, let defaults fill in."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
host: str
port: int = 3000
config = dature.load(
dature.Yaml12Source(
file=SOURCES_DIR / "merging_skip_invalid_defaults.yaml",
skip_field_if_invalid=True,
),
schema=Config,
)
assert config.host == "localhost"
assert config.port == 3000
Restrict skipping to specific fields:
"""skip_field_if_invalid per field — restrict skipping to specific fields."""
from dataclasses import dataclass
from pathlib import Path
import dature
SOURCES_DIR = Path(__file__).parent / "sources"
@dataclass
class Config:
host: str
port: int
timeout: int
config = dature.load(
dature.Yaml12Source(
file=SOURCES_DIR / "merging_skip_invalid_per_field_defaults.yaml",
),
dature.Yaml12Source(
file=SOURCES_DIR / "merging_skip_invalid_per_field_overrides.yaml",
skip_field_if_invalid=(dature.F[Config].port, dature.F[Config].timeout),
),
schema=Config,
)
assert config.host == "production.example.com"
assert config.port == 3000
assert config.timeout == 30
Only port and timeout will be skipped if invalid; other fields still raise errors.
If a required field is invalid in all sources and has no default: