Skip to main content

Metrics Engine Enrichers

Procedural escape hatch for the otherwise-declarative metrics engine. When a metric or dimension needs Python — config lookup, multi-step compute, validation — register an Enricher and bind it from YAML.

Why enrichers exist

YAML covers ~95% of cases: SQL columns, expr: per-row math, formulas over aggregated columns. The rest needs code:

  • Dimension lookupsparent_asin resolved from child_asin via new_asins
  • Cross-source joinsdynamic_attributes_values + rpt_* need a shared key
  • Per-row Python compute — MB commission tier-picking depends on per-(ASIN, period) sales total
  • Validation — error when a required: metric returns null

Enrichers are stage-typed so they slot into the pipeline at exactly the right point.

Pipeline stages

The executor calls run_enrichers_at(stage, ctx) at every transition:

StageWhat ctx holdsUsed for
post_resolveresolved (metrics, dims, internal_dims)Inject required dims/metrics for downstream stages
post_planplan (table queries, join plan)Modify SELECT columns, add CTEs (no current use)
post_fetchdataframes: dict[str, pl.DataFrame]Per-table transforms (no current use; dynamic_attribute strategy does this internally)
post_joindf: pl.DataFrame (joined, source-row grain)Most common — add metric/dim columns
post_collapsedf (collapsed to user-visible dims)Rare — between collapse and aggregator
post_aggregatedf (after formulas)Final touch-ups
pre_outputdf (renamed)Output shaping

Within a stage: dim enrichers run first, then metric enrichers (so metric enrichers can use derived dim columns). Registration order is preserved within each kind.

How dimensionality is handled

The same enricher can produce correct numbers at any grain the user asks for (dimensions=[], [seller_id], [parent_asin], [child_asin], [sku], …) because of how the pipeline stages interlock around it.

What grain does an enricher see?

post_join enrichers operate on the source row grain — for rpt_pnl_sku_economics that's (period, sku, asin) per row, even when the user asked only dimensions=[seller_id]. The internal-dim collapse step runs after enrichers, so identifier columns like asin are still present when the enricher executes.

Where do those columns come from?

Two paths get a column into the DF:

  1. User-requested dim — planner adds it to dimension_columns for each queried table; SQL emits SELECT … GROUP BY for it.
  2. Source-injection enricher at post_resolve — mutates resolved.all_dimensions and resolved.internal_dimensions. Planner treats it identically to a user dim during SQL emission. The collapse step strips it after enrichers run.

The pattern: if your enricher needs a column the user might not have asked for, register a sibling post_resolve source enricher to inject it. DynamicAttributeSourceEnricher and MbCommissionSourceEnricher both follow this.

How does the enricher's output get dimensioned?

Depends on the kind:

  • Dim enrichers (e.g. ParentAsinEnricher) — look up new column via DB query, left-join into DF, then call self._reaggregate(df, resolved, trace) which re-groups by user-visible dims + temporal cols (dropping internal sources along the way). Re-aggregation respects each metric's aggregation_rule.
  • Metric enrichers (e.g. MbCommissionEnricher) — add a column to the DF at source row grain. Do not re-aggregate. Trust the downstream _collapse_internal_dims + aggregator.aggregate steps to roll the column up according to the metric's declared aggregation_rule (SUM / Average / Recalculate).

This is why metric enrichers must produce values at source row grain: SUM aggregation naturally rolls per-ASIN/per-SKU values up to whatever grain the user requested.

The grouping-invariance property

A metric whose value is f(group_constant, row_value) and whose aggregation_rule is SUM will produce the same total regardless of grain. Concretely for MB commission:

brand_period_total = SUM(sales of brand for period) ← computed once before per-row loop
rate = pick_tier(config, brand_period_total) ← same rate for every row in (brand, period)
row_fee = row_sales × rate / 100 ← varies per row

Then SUM(row_fee) over any subset of rows equals rate × SUM(row_sales of that subset) — exactly what you'd compute if you only had that subset. So the fee at SKU grain, at ASIN grain, at parent_asin grain, and at seller grain are all mutually consistent.

Worked example — ASIN X with 3 SKUs

Source rows (monthly, brand total $12,809.59, tiers [3% / 6% @ 1k / 9% @ 20k]):

skuasinsales
SKU-AX$100
SKU-BX$200
SKU-CX$300

In the enricher:

  • Brand-period total = $12,809.59 → tier rate = 6%
  • Row 1: $100 × 6% = $6
  • Row 2: $200 × 6% = $12
  • Row 3: $300 × 6% = $18

After collapse + aggregate at the user's requested grain:

dimensions=rows for ASIN XSUM(fee)sanity check
[sku]3 (per SKU)$6 + $12 + $18 = $36each = sku_sales × 6% ✓
[child_asin]1 (one row for X)$36= $600 × 6% ✓
[] (seller-level)1 (sum across all ASINs)$12,809.59 × 6% = $768.58total matches brand-rate calc ✓

Same arithmetic produces the right answer at every grain — no special handling per dim choice.

Edge case: ASIN override with sales-range tiers

For monthly/quarterly, the current implementation picks the tier from the brand-level period total for both override and brand-default ASINs. If an ASIN's override config uses different tier thresholds than the brand default, picking the rate from the brand total can yield a different rate than picking from that ASIN's own sales. Documented limitation — see PnL caveats.

Enricher anatomy

from app.services.seller_metrics_engine.engine.enricher.base import (
Enricher, EnricherContext,
STAGE_POST_JOIN, KIND_METRIC,
)

class MyEnricher(Enricher):
stage = STAGE_POST_JOIN
kind = KIND_METRIC
name = "my_enricher" # YAML binds via `enricher: my_enricher`

def should_run(self, ctx: EnricherContext) -> bool:
# Default: True if any metric/dim names this enricher via `enricher:`
return super().should_run(ctx)

def run(self, ctx: EnricherContext) -> EnricherContext:
# Mutate ctx.df / ctx.resolved / ctx.dataframes as needed
return ctx

Register in app/services/seller_metrics_engine/engine/enricher/__init__.py:

register(MyEnricher())

Bind from YAML:

- key: my_metric
enricher: my_enricher
...

Built-in enrichers

DynamicAttributeSourceEnricher (post_resolve, dim)

Why: When a query mixes dynamic_attributes_values and rpt_* metric sources, the joiner needs a shared key. ASIN is the natural one.

What: Injects child_asin as an internal dimension so both tables include it in their GROUP BY, enabling the LEFT JOIN.

Activation: Auto-detects mixed sources in resolved.all_metrics.

MbCommissionSourceEnricher (post_resolve, dim)

Why: MB commission compute needs per-(ASIN, period) sales rows. A query asking only dimensions=[seller_id] would otherwise never include ASIN in GROUP BY.

What: Injects child_asin as internal when pnl_mb_commission_amt is requested.

ParentAsinEnricher (post_join, dim)

Why: parent_asin is native on some tables (rpt_pnl_sku_economics, rpt_br_detail_page_sales_traffic_by_child) but not most SP tables. When you query parent_asin against rpt_sponsored_products_advertised_product, the engine has only advertised_asin (child).

What: Looks up (asin, parent_asin) from new_asins, left-joins into the result DF, re-aggregates by the requested dims.

Activation: Resolver marks parent_asin in enricher_dimensions only when NOT natively present in all queried tables. (YAML's enricher: parent_asin field is naming, not force-run.)

MbCommissionEnricher (post_join, metric)

Why: Per-ASIN commission can't be expressed cleanly in our flat-SELECT SQL strategy (sales-range tier-pick needs aggregate-of-aggregate).

What:

  1. Resolves config per ASIN via get_latest_mb_commission_batch (ASIN override → brand default, latest-effective_from wins regardless of query period — see PnL config-resolution semantics).
  2. For each DF row:
    • Static configcompute_mb_fee(config, sales) from app/schemas/mb_commission.py.
    • Sales-range config → granularity-aware tier-pick:
      • Monthly / quarterly → tier from brand-level period sales total (matches billing semantics).
      • Weekly → highest tier rate applied to every row (conservative proxy; weekly brand totals rarely cross higher thresholds even when monthly billing does). See PnL caveat for the rationale.
  3. Writes pnl_mb_commission_amt column. Null when no config — the required-metric validator then raises if pnl_mb_commission_amt is marked required: true.

Single DB lookup per query (configs aren't period-scoped under V1's latest-wins resolution).

Patterns

Add a new dim enricher (lookup pattern)

Mirror ParentAsinEnricher:

  1. Subclass BaseEnricher, set dim_key="my_dim" and source_dimension="child_asin" (or whatever your input is).
  2. Implement enrich(df, session, marketplace, source_column, filters, resolved, trace):
    • Use source_column (looked up by the base class from source_dimension's table_column_map) as the join key.
    • Run one DB query per query (don't loop).
    • Call self._reaggregate(df, resolved, trace) at the end to re-group.
  3. Register in enricher/__init__.py.
  4. In dimensions.yaml, add enricher: my_dim to the dimension entry.

Add a new metric enricher (compute pattern)

Mirror MbCommissionEnricher:

  1. Subclass Enricher with stage=STAGE_POST_JOIN, kind=KIND_METRIC, name="my_metric".
  2. Implement run(ctx):
    • Identify required input columns on ctx.df (handle alias-vs-key naming).
    • Compute the new column.
    • Write to ctx.df and return ctx.
  3. In metrics.yaml: sources: null, formula: null, enricher: my_metric, metric_refs: [<deps>] to force their fetch.
  4. Register.

Add a source-injection enricher (resolver pattern)

Mirror MbCommissionSourceEnricher when your enricher needs a dim that isn't in the user's request:

  1. Subclass Enricher with stage=STAGE_POST_RESOLVE, kind=KIND_DIM.
  2. should_run: check ctx.resolved.all_metrics for the trigger.
  3. run: load the dim via get_dimension(), add to ctx.resolved.all_dimensions + ctx.resolved.internal_dimensions.

The collapse step will drop the internal dim's column after the metric enricher uses it.

Gotchas

  • Column naming at post_join: aliases haven't been renamed to metric keys yet. pnl_sales may still be in the DF as sales (the alias). Check both.
  • Don't _reaggregate before metric enrichers run: re-grouping drops fine-grained columns (asin) that downstream metric enrichers need. BaseEnricher._reaggregate is for dim enrichers; metric enrichers usually shouldn't call it.
  • should_run for dim enrichers: BaseEnricher.should_run checks resolved.enricher_dimensions. The resolver populates this only when the dim is NOT natively present in all queried tables. So a YAML enricher: binding on a dim is purely declarative — the enricher only fires when actually needed.
  • YAML loaded once at import (mcp_server/seller_metrics_engine/definitions/__init__.py): restart the MCP server after YAML changes.