返回 Skill 列表
extension
分类: 内容与媒体无需 API Key

signal-factory-core

由AI驱动的信号工厂核心,用于处理可观测性信号并维护知识图谱。使用场景:(1) 开发信号引擎(新鲜度、漂移、合约、数据质量、数量、异常);(2) 配置信号路由器以进行标准化和路由;(3) 为资产和血统设计Neptune图模式;(4) 实现DynamoDB状态管理以处理事件。触发条件:“创建信号引擎”、“配置信号路由器”、“设计图模式”、“实现信号处理”。

person作者: jakexiaohubgithub

Signal Factory Core

The Signal Factory Core is the central processing hub that transforms raw telemetry into actionable signals, maintains the knowledge graph in Neptune, and powers the RCA Copilot with pre-computed incident context.

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                      SIGNAL FACTORY CORE                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
│  │   Signal    │───▶│  Event Bus  │───▶│   Signal Engines    │ │
│  │   Router    │    │ (Kinesis/   │    │ ┌─────┐ ┌─────┐    │ │
│  │             │    │    MSK)     │    │ │Fresh│ │ Vol │    │ │
│  └─────────────┘    └─────────────┘    │ └─────┘ └─────┘    │ │
│        │                               │ ┌─────┐ ┌─────┐    │ │
│        │ Normalization                 │ │Drift│ │ DQ  │    │ │
│        │ + Correlation                 │ └─────┘ └─────┘    │ │
│        │ + URN Resolution              │ ┌─────┐ ┌─────┐    │ │
│        ▼                               │ │Anom │ │Cost │    │ │
│  ┌─────────────┐                       │ └─────┘ └─────┘    │ │
│  │  Canonical  │                       └─────────────────────┘ │
│  │   Signal    │                               │               │
│  │   Event     │                               ▼               │
│  └─────────────┘                       ┌───────────────────┐   │
│                                        │ State & Graph     │   │
│                                        │ ┌─────┐ ┌─────┐  │   │
│                                        │ │DDB  │ │Nept │  │   │
│                                        │ └─────┘ └─────┘  │   │
│                                        └───────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Signal Router

Responsibilities

  1. Normalization: Convert raw inputs into canonical Signal Events
  2. URN Resolution: Standardize asset identifiers
  3. Correlation Extraction: Extract trace_id, handoff_ids, lineage refs
  4. Schema Resolution: Bind events to schema versions
  5. Routing: Direct signals to appropriate engines

Canonical Signal Event Schema

{
  "event_id": "sig-2026-01-04-001",
  "event_type": "SignalEvent",
  "timestamp": "2026-01-04T10:00:00Z",
  "source": {
    "type": "kafka",
    "asset_urn": "urn:kafka:prod:msk:orders_enriched",
    "service_urn": "urn:svc:prod:commerce:orders-enricher"
  },
  "correlation": {
    "trace_id": "abc123",
    "span_id": "def456",
    "handoff_ids": {
      "kafka_offset": 12345,
      "partition": 3
    },
    "lineage_ref": "map-orders-enriched-v7"
  },
  "payload": {
    "signal_type": "volume",
    "value": 1523,
    "unit": "messages_per_minute"
  },
  "schema": {
    "id": "schema-v42",
    "version": 42,
    "registry": "confluent"
  }
}

Signal Engines

Freshness Engine

Detects stale data based on expected arrival times.

class FreshnessEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        asset = await self.get_asset(event.source.asset_urn)
        sla = asset.contracts.freshness.max_delay_minutes
        
        last_update = await self.get_last_update(asset.urn)
        delay = (event.timestamp - last_update).minutes
        
        if delay > sla:
            return SignalState(
                asset_urn=asset.urn,
                signal_type="freshness",
                status="BREACH",
                severity=self.calculate_severity(delay, sla),
                evidence=FreshnessEvidence(
                    expected_delay=sla,
                    actual_delay=delay,
                    last_update=last_update
                )
            )
        return SignalState(status="OK")

Volume Engine

Detects anomalies in data volume.

class VolumeEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        asset = await self.get_asset(event.source.asset_urn)
        baseline = await self.get_baseline(asset.urn, window="7d")
        
        current = event.payload.value
        z_score = (current - baseline.mean) / baseline.stddev
        
        if abs(z_score) > 3:  # 3-sigma anomaly
            return SignalState(
                status="ANOMALY",
                evidence=VolumeEvidence(
                    expected=baseline.mean,
                    actual=current,
                    z_score=z_score
                )
            )

Schema Drift Engine

Detects breaking schema changes.

class DriftEngine:
    async def process(self, event: SignalEvent) -> SignalState:
        current = await self.get_schema(event.schema.id)
        previous = await self.get_schema(f"schema-v{event.schema.version - 1}")
        
        compatibility = await self.check_compatibility(previous, current)
        
        if not compatibility.is_backward:
            return SignalState(
                status="DRIFT",
                evidence=DriftEvidence(
                    breaking_changes=compatibility.changes,
                    affected_consumers=await self.get_consumers(event.source.asset_urn)
                )
            )

Contract Engine

Validates data against contracts.

DQ Engine

Processes Deequ results and quality metrics.

Anomaly Engine

ML-based anomaly detection across all metrics.

Cost Engine

Tracks and allocates compute costs.

Neptune Graph Schema

Node Types

// Asset nodes
g.addV('Asset')
  .property('urn', 'urn:kafka:prod:msk:orders_enriched')
  .property('type', 'kafka-topic')
  .property('tier', 1)
  .property('owner', 'orders-team')

// Service nodes
g.addV('Service')
  .property('urn', 'urn:svc:prod:commerce:orders-enricher')
  .property('language', 'java')
  .property('framework', 'spring-boot')

// Incident nodes
g.addV('Incident')
  .property('id', 'INC-2026-01-04-001')
  .property('status', 'ACTIVE')
  .property('severity', 'P1')

// Evidence nodes
g.addV('Evidence')
  .property('type', 'schema_drift')
  .property('confidence', 0.95)

Edge Types

// Lineage edges
g.V(service).addE('PRODUCES').to(topic)
g.V(service).addE('CONSUMES').from(topic)

// Incident edges
g.V(incident).addE('AFFECTS').to(asset)
g.V(incident).addE('SUPPORTED_BY').to(evidence)

// Schema edges
g.V(schema_v42).addE('SUPERSEDES').to(schema_v41)

DynamoDB State Tables

SignalState Table

{
  "pk": "urn:kafka:prod:msk:orders_enriched",
  "sk": "signal#freshness",
  "status": "OK",
  "last_value": 1523,
  "last_update": "2026-01-04T10:00:00Z",
  "ttl": 1704448800
}

IncidentContextCache Table

{
  "pk": "INC-2026-01-04-001",
  "incident_id": "INC-2026-01-04-001",
  "primary_asset": "urn:kafka:prod:msk:orders_enriched",
  "top_evidence": ["schema_drift_v42", "volume_drop_10am"],
  "blast_radius": ["consumer-a", "consumer-b", "delta-table"],
  "timeline": [...],
  "updated_at": "2026-01-04T10:05:00Z"
}

Scripts

  • scripts/signal_router.py: Main routing and normalization
  • scripts/freshness_engine.py: Freshness signal processing
  • scripts/volume_engine.py: Volume anomaly detection
  • scripts/drift_engine.py: Schema drift detection
  • scripts/graph_writer.py: Neptune graph operations
  • scripts/state_manager.py: DynamoDB state operations

References

  • references/signal-schema.json: Canonical signal event schema
  • references/graph-schema.md: Neptune graph model
  • references/engine-configs/: Per-engine configuration

Configuration

signal_factory:
  router:
    input_topic: "raw-telemetry"
    output_topic: "canonical-signals"
  engines:
    freshness:
      enabled: true
      check_interval_seconds: 60
    volume:
      enabled: true
      baseline_window_days: 7
    drift:
      enabled: true
  storage:
    neptune:
      endpoint: "wss://neptune.us-east-1.amazonaws.com:8182/gremlin"
    dynamodb:
      signal_state_table: "SignalState"
      incident_cache_table: "IncidentContextCache"