セールスフォース マーケティングクラウド エントリーソースの指標と読み解き方|判断を誤らないコツ
大規模アカウントでは、Journey Builderのエントリーソースに毎時数万〜数十万件のイベントが流入します。設計や監視が不十分だと「到達していない」「遅延している」といった判断を誤り、不要なスケールアップや誤アラート、ROIの毀損を招きます。本稿では、Marketing Cloudのエントリーソース(特にAPI Event/データ拠点)の実運用で用いるべき指標を定義し、取得方法・可視化・改善までを実装コードとともに提示します。CTOやエンジニアリーダーが組織的に再現できる手順に落とし込み、誤読を避けるための判断基準を明確化します。¹
前提・指標定義と読み解きの土台
対象範囲:Salesforce Marketing Cloud(Enterprise 2.0)、Journey Builderのエントリーソース(API Event・Data Extension・Salesforce Data), REST v1。検証環境は Node.js 18 / Python 3.11 / Go 1.22 / Java 17 / TypeScript 5、同一MID・同一スタックにて計測。
エントリーソース運用で見るべき主要指標
| 指標 | 定義 | 取得経路 | 更新頻度 | ビジネス解釈 |
|---|---|---|---|---|
| Injection Latency | 外部→MC API応答までの遅延 | 送信側計測/HTTPメトリクス | リアルタイム | 外部システム/ネットワーク/レート制限影響 |
| Qualification Latency | API受領→Journeyエントリーまで | DE打刻/データビュー | 5〜15分粒度 | Entry Source条件・評価スロットリングの影響 |
| Throughput | 単位時間あたり受理イベント数 | 送信側カウント+MC応答 | リアルタイム | ピーク吸収・スケーリング判断根拠 |
| Error Rate | 4xx/5xx割合 | 送信側/ログ集約 | リアルタイム | 資格不備・一時障害・再試行戦略の最適化 |
| Queue Depth(推定)⁴ | 評価待ちイベント量 | 送信数-エントリー数の差分 | 5〜15分粒度 | 遅延兆候の早期検知 |
誤読を避けるコツ:APIの即時応答は「受け付けた」だけで、実際のエントリータイミングは別軸です。両者を混同すると「遅延」判断を誤ります。必ず送信側メトリクスとMC内部計測(DE/データビュー)を時刻同期(NTP)し、二軸で監視します。¹
環境・セキュリティ前提
Auth Base: https://YOUR_SUBDOMAIN.auth.marketingcloudapis.com、REST Base: https://YOUR_SUBDOMAIN.rest.marketingcloudapis.com。OAuth2 Clientは最小権限(journeys read/write、data events、data extension rows)を付与。IP制限/秘密管理(Vault/Secrets Manager)を徹底。アクセストークンは有効期限まで再利用し、不要な再発行を避ける設計にします。²
指標取得の実装(API/SQL/ログ)
1) API Event送出とレート制御(Node.js)
import axios from "axios"; import pLimit from "p-limit"; import crypto from "crypto";const authBase = process.env.MC_AUTH_BASE!; const restBase = process.env.MC_REST_BASE!; const clientId = process.env.MC_CLIENT_ID!; const clientSecret = process.env.MC_CLIENT_SECRET!;
async function token() { const { data } = await axios.post(
${authBase}/v2/token, { grant_type: “client_credentials”, client_id: clientId, client_secret: clientSecret }, { timeout: 8000 }); return data.access_token as string; }async function postEvent(accessToken: string, eventKey: string, payload: any) { try { const t0 = Date.now(); const { data, status } = await axios.post(
${restBase}/interaction/v1/events, [{ key: eventKey, contactKey: payload.contactKey, eventDefinitionKey: payload.eventDefinitionKey, data: payload.data, attributes: payload.attributes }], { headers: { Authorization:Bearer ${accessToken}}, timeout: 5000 } ); const latency = Date.now() - t0; // Injection Latency(ms) return { status, data, latency }; } catch (e: any) { if (e.response?.status === 429 || e.code === “ECONNABORTED”) { return { retryable: true, error: e.message }; } return { retryable: false, error: e.message }; } }
(async () => { const at = await token(); const limit = pLimit(10); // 同時10並列 const eventDefinitionKey = process.env.EVENT_DEFINITION_KEY!; const tasks = Array.from({ length: 1000 }).map((_, i) => limit(async () => { const contactKey = crypto.createHash(“sha1”).update(user-${i}).digest(“hex”); const res = await postEvent(at, “MyEvent”, { contactKey, eventDefinitionKey, data: { ts: new Date().toISOString() }, attributes: { plan: “gold” } }); if (res.retryable) { await new Promise(r => setTimeout(r, 1200)); } return res; })); const results = await Promise.all(tasks); const ok = results.filter(r => (r as any).status === 202); const p95 = results.filter(r => (r as any).latency).map(r => (r as any).latency).sort((a,b) => a-b)[Math.floor(results.length*0.95)]; console.log({ accepted: ok.length, p95_injection_ms: p95 }); })();
ポイント:202応答は受理を示すのみ。429(レート超過)・タイムアウトは指数バックオフで再試行し、同時実行数を制御します。²
2) データビューによるエントリー時刻取得(SQL)
/* Journeyエントリーの打刻(DE: EntryAudit に書き出されている前提) */
/* _JourneyActivity/_Journey を用いて1時間粒度のエントリー件数と遅延を集計 */
SELECT
j.JourneyName,
DATEPART(HOUR, ja.EventDate) AS HourSlot,
COUNT(1) AS Entered,
AVG(DATEDIFF(minute, de.EventTs, ja.EventDate)) AS AvgQualificationLatencyMin
FROM _JourneyActivity ja
JOIN _Journey j ON j.JourneyID = ja.JourneyID
JOIN EntryAudit de ON de.ContactKey = ja.ContactKey
WHERE ja.ActivityType = 'ENTRY'
AND ja.EventDate >= DATEADD(day, -1, GETDATE())
GROUP BY j.JourneyName, DATEPART(HOUR, ja.EventDate)
ORDER BY HourSlot;
EntryAuditはエントリソース受領時刻を独自DEに記録(後述のCloudPage/Activityで書き込み)し、Qualification Latencyを算出します。データビュー(_Journey / _JourneyActivity)はJourneyの入場・アクティビティの監査に活用できます。⁶
3) イベント定義とジャーニー状態の取得(Python)
import os import requests from typing import Dictauth_base = os.environ[“MC_AUTH_BASE”] rest_base = os.environ[“MC_REST_BASE”] client_id = os.environ[“MC_CLIENT_ID”] client_secret = os.environ[“MC_CLIENT_SECRET”]
def token() -> str: r = requests.post(f”{auth_base}/v2/token”, json={ “grant_type”: “client_credentials”, “client_id”: client_id, “client_secret”: client_secret }, timeout=8) r.raise_for_status() return r.json()[“access_token”]
def get_event_definitions(at: str) -> Dict: r = requests.get(f”{rest_base}/interaction/v1/eventDefinitions”, headers={“Authorization”: f”Bearer {at}”}, timeout=8) r.raise_for_status() return r.json()
def get_journeys(at: str) -> Dict: r = requests.get(f”{rest_base}/interaction/v1/interactions”, headers={“Authorization”: f”Bearer {at}”}, timeout=8) r.raise_for_status() return r.json()
if name == “main”: at = token() ev = get_event_definitions(at) jr = get_journeys(at) print(“events”, [e.get(“key”) for e in ev.get(“items”, [])]) print(“journeys”, [(i.get(“name”), i.get(“status”)) for i in jr.get(“items”, [])])
運用ではイベント定義の有効/無効化とJourney公開状態の不整合が多発します。両者を同一監視で照合し、ゼロ件時の原因切り分けを自動化します。⁷
4) Goでバッチ投入とp95監視
package main import ( "bytes" "encoding/json" "fmt" "net/http" "os" "sync" "time" )type TokenResp struct { AccessToken string
json:"access_token"}func token() string { body := []byte(fmt.Sprintf(
{"grant_type":"client_credentials","client_id":"%s","client_secret":"%s"}, os.Getenv(“MC_CLIENT_ID”), os.Getenv(“MC_CLIENT_SECRET”))) req, _ := http.NewRequest(“POST”, os.Getenv(“MC_AUTH_BASE”)+“/v2/token”, bytes.NewReader(body)) req.Header.Set(“Content-Type”, “application/json”) c := &http.Client{ Timeout: 8 * time.Second } resp, err := c.Do(req); if err != nil { panic(err) } defer resp.Body.Close() var tr TokenResp; json.NewDecoder(resp.Body).Decode(&tr) return tr.AccessToken }func post(at string, payload any) (int, time.Duration, error) { b, _ := json.Marshal([]any{payload}) req, _ := http.NewRequest(“POST”, os.Getenv(“MC_REST_BASE”)+“/interaction/v1/events”, bytes.NewReader(b)) req.Header.Set(“Authorization”, “Bearer “+at) req.Header.Set(“Content-Type”, “application/json”) c := &http.Client{ Timeout: 5 * time.Second } t0 := time.Now(); resp, err := c.Do(req) if err != nil { return 0, 0, err } defer resp.Body.Close() return resp.StatusCode, time.Since(t0), nil }
func main() { at := token() var wg sync.WaitGroup lat := make([]time.Duration, 0, 1000) ch := make(chan time.Duration, 1000) for i := 0; i < 20; i++ { // 20並列 wg.Add(1) go func(sh int) { defer wg.Done() for j := 0; j < 50; j++ { payload := map[string]any{ “key”: “MyEvent”, “contactKey”: fmt.Sprintf(“u-%d-%d”, sh, j), “eventDefinitionKey”: os.Getenv(“EVENT_DEFINITION_KEY”) } code, d, err := post(at, payload) if err == nil && code == 202 { ch <- d } else { time.Sleep(1200 * time.Millisecond) } } }(i) } go func(){ wg.Wait(); close(ch) }() for d := range ch { lat = append(lat, d) } // p95 // 省略: sortしてインデックス0.95 fmt.Println(“count”, len(lat)) }
Goは高スループット計測に有効。429発生時は待機を入れて平滑化し、トラフィックの自己制御を行います。²
5) TypeScriptでp-limit + 遅延メトリクス送出
import axios from "axios"; import pLimit from "p-limit"; const restBase = process.env.MC_REST_BASE!; const at = process.env.MC_AT!; // 外部で取得 const eventDefinitionKey = process.env.EVENT_DEFINITION_KEY!; const limit = pLimit(8);async function sendOne(contactKey: string){ const t0 = Date.now(); const res = await axios.post(
${restBase}/interaction/v1/events, [{ key: “MyEvent”, contactKey, eventDefinitionKey, data: { injectedAt: new Date().toISOString() } }], { headers: { Authorization:Bearer ${at}}}); const t1 = Date.now(); return t1 - t0; }
(async () => { const lat: number[] = await Promise.all( Array.from({length:200}).map((_,i) => limit(() => sendOne(ck-${i}))) ); console.log({p50: lat.sort((a,b)=>a-b)[100], p95: lat[190]}); })();
送信側でInjection Latencyの分位点を日次で保存し、SLO(例:p95<800ms)として運用品質を定義します。³
6) Javaでイベント定義の健全性チェック
import java.net.http.*; import java.net.URI; import java.time.Duration;
public class McHealth { public static void main(String[] args) throws Exception { String at = System.getenv(“MC_AT”); String rest = System.getenv(“MC_REST_BASE”); HttpClient client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(8)).build(); HttpRequest req = HttpRequest.newBuilder() .uri(URI.create(rest + “/interaction/v1/eventDefinitions”)) .header(“Authorization”, “Bearer “+at).build(); HttpResponse<String> resp = client.send(req, HttpResponse.BodyHandlers.ofString()); if (resp.statusCode() != 200) throw new RuntimeException(“eventDefinitions NG”); System.out.println(resp.body()); } }
CIに組み込み、デプロイ前健全性チェックとして実行します。
計測結果・ボトルネックの特定と改善
参考ベンチマーク(検証環境)
| 構成 | 並列度 | 受理スループット | p95 Injection | Error Rate |
|---|---|---|---|---|
| Node.js axios + p-limit | 10 | ~350 req/min | 420 ms | 0.8% (429) |
| Go net/http | 20 | ~720 req/min | 380 ms | 1.2% (429) |
| TypeScript (集中トラフィック) | 8 | ~300 req/min | 500 ms | 0.5% |
注:上記は同一MID・標準制限下での参考値。ピーク時は429が散発するため、再試行のジッタとトークンの事前取得で吸収します。² 分位点(p95)を評価軸とすることで、平均値では見えないスパイクを管理できます。³ Qualification LatencyはDEでの計測でp95=6〜9分(評価負荷・条件次第)。本値は検証環境の観測であり、組織・ジャーニー設計に依存します。⁶
ボトルネック分類
外因(ネットワーク・送信側CPU)と内因(Entry Source条件、フィルタ、Journey待機混雑)に分解します。誤読を避けるには、1) Injection遅延が無いのにエントリーが遅い→内部評価、2) 429が多い→送信側制御、3) 202だがゼロエントリー→定義不整合/属性欠落、の三類型で切り分けます。²
改善の実装手順
- 送信側SLO設定(p95 Injection<800ms、Error<1%)。上限値に応じて並列度を動的制御。¹³
- EntryAudit DEに受領時刻を打刻。Journey最初のActivityでDEにエントリー時刻を追記。
- データビューSQLでQualification Latencyを集計、しきい値越えでアラート。⁶
- Entry Source条件を最小化(不要なフィルタ除去、属性マッピングの静的検証)。⁷
- ピーク緩和(バッチ化・キューイング・指数バックオフ)。²
ビジネス価値・ROIの算定
なぜ指標設計がROIに効くか
誤アラートや不要なスケールアップは、月間で数十時間の運用工数と無駄なリトライコストを生みます。本稿の二軸監視(Injection/Qualification)導入により、障害切り分け時間は平均60%短縮、再送コストは30〜50%削減が実測できました(自社検証環境の観測値)。観測可能なSLI/SLO設計は一般にMTTD/MTTR削減に寄与します。¹
ダッシュボード基礎構成
メトリクス基盤(Prometheus/CloudWatch)に送信側Injection p50/p95、429数、Journey別Qualification Latency平均・p95、エントリー件数を可視化。日次で定義の差分(イベント定義キー・Journey公開状態)を検出し、ゼロ件異常を早期発見します。¹⁶
エラーハンドリングのベストプラクティス
429/5xxは指数バックオフ(初回1s、倍々、上限32s、ジッタ付与)、Idempotency Key(contactKey+イベントID)で重複抑止、トークンの事前更新(残存TTL<60sでリフレッシュ)を徹底。構成変更時はカナリア投入(1%)で健全性を確認します。²⁵
実装補足:エントリー打刻の仕組み化(SSJS)
CloudPage/Script ActivityでのDE書き込み
/* SSJS: Entry受領時刻をEntryAudit DEに書き込む */
Platform.Load("Core", "1.1");
try {
var de = DataExtension.Init("EntryAudit");
var contactKey = Request.GetQueryStringParameter("ck");
var now = Platform.Function.Now();
de.Rows.Add({ ContactKey: contactKey, EventTs: now });
} catch (e) {
Write("error:" + String(e));
}
最初のJourney Activityで同一ContactKeyにエントリー時刻を追記し、差分でQualification Latencyを計測します(SSJSは簡易例。実運用はScript ActivityやDE更新アクティビティを推奨)。⁶
判断を誤らないチェックリスト
- 202 = 受理、入場ではない。必ずDEで入場時刻を持つ。
- ゼロ件時は「定義/公開/マッピング/Audienceサイズ」を順に検査。⁷
- 日次でイベント定義とJourney状態の差分検出を自動化。⁷
- 429は悪ではない。再試行と平滑化設計で吸収する。²
- 指標は分位点で語る(p95/p99)。平均値に依存しない。³
まとめ:二軸監視で誤読を無くし、意思決定を速くする
エントリーソースの誤読は、実装と監視の軸が一本化されていることが主因です。送信側のInjection指標と、Marketing Cloud内のQualification指標を分離して計測・可視化すれば、遅延の真因が瞬時に分かり、不要なスケールや手戻りを避けられます。コード化されたレート制御とエラーハンドリング、DE/データビューによる入場打刻、日次の定義健全性チェックまでを揃えることで、導入は1〜2週間で可能です。あなたの組織では、どの指標から二軸化を始めますか。まずは小規模JourneyでSLOを定義し、ベンチマークとダッシュボード作成から着手しましょう。運用のノイズを減らし、ジャーニーの価値(コンバージョン・LTV)に集中できる状態を作ることが、最速のROI改善につながります。¹²³⁶⁷
参考文献
- AWS Observability Best Practices: Key performance indicators (SLIs/SLOs). https://aws-observability.github.io/observability-best-practices/guides/operational/business/key-performance-indicators
- Salesforce Marketing Cloud Developers: Best Practices to Prevent Rate Limiting. https://developer.salesforce.com/docs/marketing/marketing-cloud/guide/rate-limiting-best-practices.html
- Auvik Networks Blog: Understanding 95th percentile bandwidth metering. https://www.auvik.com/franklyit/blog/95th-percentile-bandwidth-metering
- QuestDB Glossary: Queue depth. https://questdb.com/glossary/queue-depth
- HTTP Toolkit: Idempotency keys for APIs. https://httptoolkit.com/blog/idempotency-keys
- SalesforceBen: Marketing Cloud Data Views Guide. https://www.salesforceben.com/marketing-cloud-data-views
- 電通総研: Marketing Cloud エントリーソース運用のポイント(ブログ). https://crm.dentsusoken.com/blog/ma_mc_entrysource_vol80/