<!-- canonical: https://docs.axelabs.ai/services/stream -->
<!-- source: content/services/stream.mdx -->

---
title: Stream
description: S&OP 통합 MCP — sales, inventory, settlement. 22개 도구, Postgres 16 SSOT.
---

# Stream

**한 줄 소개**: Sales · Operations · Settlement 통합 MCP. 매출·재고·정산·환불의 전체 흐름을 책임짐. realchoice (truvia) 운영의 SSOT. 28개 MCP 도구.

## 정체성

> "주문이 들어와 재고가 나가고 매출이 정산되고 환불이 처리되는 모든 *흐름* 을 책임지는 MCP."

3 가지 의미:
1. **stream of data** — 매출·재고·정산·물류·발주 데이터의 흐름
2. **cash flow stream** — 들어오는 매출 → 나가는 발주·정산·환불
3. **magnet(당기다) ↔ stream(흐르다)** — 마케팅이 수요를 당기면 운영이 공급을 흐르게 함

## 기술 스택

| 항목 | 값 |
|---|---|
| 언어 | Python 3.12 |
| 프레임워크 | FastMCP (stdio + SSE) 1.27.1 |
| DB | PostgreSQL 16 (`truvia_ssot`, host port 5433) |
| DB driver | psycopg v3.2+ |
| Scheduler | supercronic v0.2.29 (Docker 내) |
| Slack | slack-bolt 1.28, slack-sdk 3.41 (Socket Mode) |
| Crypto | cryptography 48.0 (pgp_sym_encrypt vault) |
| API 클라이언트 | google-api-python-client, anthropic |

## 포트

| 포트 | 용도 |
|---|---|
| 8780 | MCP HTTP/SSE (`stream-mcp`) |
| 5433 → 5432 | PostgreSQL (`truvia-ssot-db`, internal alias `db`) |

## Docker Compose (3 service)

```yaml
services:
  stream-mcp:
    image: stream:latest
    ports: ["127.0.0.1:8780:8780"]
    command: python -m stream_mcp.server
    # FastMCP SSE 서버 (22개 도구)

  stream-data-ops:
    image: stream:latest
    command: python data_ops/supervisor.py
    # Slack Bolt Socket Mode 슈퍼바이저 (outbound only)

  stream-cron:
    image: stream:latest
    command: supercronic -overlapping /app/crontab
    # 6 cron task

networks:
  ssot:
    external: true                   # truvia-ssot_default 와 공유
```

3 service 모두 동일 image 공유 → 배포 단순화.

## 28 MCP Tools (prefix `stream_*`)

| Domain | Tools |
|---|---|
| **sales** (5) | `get_orders`, `get_revenue`, `top_products`, `refund_summary`, `forecast` |
| **inventory** (4) | `current`, `history`, `low_alerts`, `purchase_order_suggest` |
| **settlement** (3) | `lag`, `match`, `pending` |
| **health** (3) | `freshness`, `alerts`, `db_status` |
| **bridge** (2) | `ads_get_spend`, `ads_get_roas` (magnet adapter) |
| **signals** (3) | `emit_purchase_signal`, `emit_inventory_alert`, `signals_recent` |
| **admin** (7) | `list_tenants`, `tenant_get/create/update`, `api_key_create/rotate`, `unified_inventory` |
| **meta (1)** | `stream_unified_inventory` (자기진단) |

## 데이터 수집 (6 collectors)

매시 정각 (`hourly_collect.py`):

| 채널 | 모듈 | API |
|---|---|---|
| 쿠팡 | `data_ops/collectors/coupang.py` | OPEN API (발주서, 매출, 반품) |
| 네이버 커머스 | `data_ops/collectors/naver_commerce.py` | 주문 sync API |
| 네이버 검색광고 | `data_ops/collectors/naver_searchad.py` | (현재 미작동, magnet 으로 이전 예정) |
| Meta Ads | `data_ops/collectors/meta_ads.py` | Marketing API (캠페인, 일일 insights) |
| Magnet | `data_ops/collectors/magnet.py` | magnet 폴더 JSONL ingest |

## 데이터 모델 (핵심)

| 테이블 | 역할 |
|---|---|
| `orders` | 1주문 = 1행 (채널 native ID, 결제/배송/정산 시점) |
| `order_items` | 1주문 N라인 (SKU, 옵션, 수량, 금액) |
| `order_events` | 라이프사이클 상태 변경 로그 |
| `products` | 채널 노출 카탈로그 |
| `product_options` | 상품 옵션 |
| `inventory_supplies` | 내부 부자재 |
| `settlements` | 정산 raw data |
| `settle_cases` | 정산 정형화 |
| `settle_commissions` | 정산 수수료 |
| `vat_cases` | 부가세 |
| `raw_channel_payloads` | API 원본 영구 보존 (180일 retention) |
| `ad_spend_daily` | 광고비 일일 (magnet owner, stream read mirror) |
| `secrets` | Vault (pgp_sym_encrypt + audit_log) |

### Materialized Views

- `v_order_lifecycle` — 1주문 6단계 (paid → recognized → settled) + lag
- `v_revenue_by_stage_monthly` — 월별 (결제·인식·정산 3 시점 비교)
- `v_settlement_lag_stats` — 정산 지연 분석
- `v_business_flow_monthly` — 종합 월간
- `v_data_freshness` — 채널별 데이터 최신성

## Magnet 신호 교환

3 채널 양방향:

| 신호 | 발신자 | 수신자 | 채널 |
|---|---|---|---|
| `purchase_signal` | stream `emit_purchase_signal` | Meta CAPI | `data/signals/&lt;channel&gt;-YYYY-MM.jsonl` + Meta API |
| `inventory_alert` | stream `emit_inventory_alert` | magnet (광고 일시정지) | JSONL + pg_notify |
| `ad_spend_mirror` | magnet `ad_spend_daily` | stream read mirror | DB direct (magnet_client.py) |

## Crontab (6 task)

| 주기 | 작업 |
|---|---|
| 매시 정각 | `hourly_collect.py` — 6개 collector |
| 매 5분 | `watchdog.sh` — Slack 봇 BrokenPipe 감지 |
| 매 30분 | `health_check.sh` — freshness 점검 |
| 매일 09:10 | `magnet_alerts.py` — daily summary + ROAS alert |
| 매일 00:05 | `logrotate.sh` |
| 매주 일요일 04:30 | `retention.py --days 180 --vacuum` |

## 환경 변수

```bash
# DB
STREAM_DB_HOST=db
STREAM_DB_PORT=5432
STREAM_DB_NAME=truvia_ssot
STREAM_DB_USER=agent11
STREAM_DB_PASSWORD=

# Secrets vault
TRUVIA_SECRETS_MASTER_KEY=                # pgp_sym_encrypt master

# MCP
STREAM_TRANSPORT=sse
STREAM_HTTP_HOST=0.0.0.0
STREAM_HTTP_PORT=8780

# Slack
DATA_OPS_SLACK_BOT_TOKEN=
DATA_OPS_CHANNEL=#data-ops

# Magnet adapter
MAGNET_REPO_ROOT=/Users/realchoice/magnet

# Signals
STREAM_SIGNALS_DIR=/Users/realchoice/stream/data/signals
```

env file 위치: `~/.config/truvia/stream.env` (host 저장, chat transcript 영역 외, .gitignore 외부).

## 외부 의존성

| 시스템 | 통합 |
|---|---|
| PostgreSQL 16 truvia_ssot | data SoT (read/write) |
| Slack workspace | Bolt Socket Mode + #data-ops alert |
| 쿠팡 OPEN API | 발주서, 매출, 반품·교환 |
| 네이버 커머스 API | 주문 sync |
| Meta Ads Manager | 캠페인, insights |
| Meta CAPI | purchase event emit |
| Google Sheets API | 자동 export |
| magnet MCP | ad_spend mirror, signal receiver |

## 운영 노트

- **GitHub repo**: `soohunkang/stream` (private)
- **이전 4 MCP → 통합** (2026-05-11 stream + magnet)
- **LaunchAgent 8종 archived** — Docker Compose 라운드 7 수렴
- **Multi-tenant scaffold** 준비 (`_set_tenant_context()`, schema-multi-tenant.sql)

## 관련 문서

- [Magnet service](/services/magnet) — 마케팅 자동화, 신호 발신자
- [/Users/axe/stream/docs/](https://github.com/soohunkang/stream/tree/main/docs) — 16개 운영 문서
