Article

最新の脅威情報を効率的に収集する方法

高田晃太郎
最新の脅威情報を効率的に収集する方法

統計によると、Verizon DBIR 2024 は侵害の約68%に人的要素が関与すると報告し¹、Mandiant(Google Cloud)の年次レポート M‑Trends 2024 では潜伏期間(dwell time)の中央値が約10日まで短縮したとされます²。脅威アクターのサイクルが短くなるほど、古いインジケータは価値を失い、更新頻度と正確性が防御力を左右します。公開レポートやRFCが示す傾向として、IOC(Indicator of Compromise: 侵害の痕跡)の有効期間は数時間から数日単位で劣化しやすく³⁴、単純なRSS購読やメール転送では取りこぼしとノイズが積み上がります。そこで本稿では、CTOやエンジニアリーダーが運用可能な現実解として、標準プロトコルのTAXII/STIX⁵⁶(TAXIIは配信プロトコル、STIXは表現フォーマット)を核に、MISP⁹¹¹やOpenCTI⁸といったプラットフォームを組み合わせ、収集・正規化・重複排除・信頼度評価・配信までを最適化するアーキテクチャと実装を、セキュリティ運用のKPIに結び付ける視点で解説します。

なぜ「集める前に設計」が効くのか:ノイズと鮮度の経済学

脅威情報の運用効率を語る前に、何を集めるのかを定義することが最重要です。一次情報としてはTAXIIで配布されるSTIXオブジェクト⁵⁶(標準化された脅威情報)、あるいはコミュニティ主導のMISPイベント⁹¹¹が挙げられます。二次情報としてはブログ、脆弱性の脅威優先度解説、レピュテーションのスコアなどがあります。設計の要点は、IOC中心の高速ループと、戦術・技術・手順(TTP)中心の低速ループを分けることにあります。前者はDNSやプロキシ、EDRでのブロックや検知に直結し、後者は検出ロジックやハントクエリの更新に効きます。両者を同一バスで扱うと、更新周期が衝突しスループットを落とします。

さらに重要なのは、重複排除と信頼度の早期付与です。同じIPアドレスが複数のソースから届いたとき、最初に到着した情報が必ずしも最良とは限りません。値とタイプに加え、source、observed_time、valid_from、confidenceをキー化してマージする方針を最初から持ち込み、スコアは「ソースの信用」「観測の新しさ」「過去の誤検知率」などの重みに分解します。これにより、SIEMやEDRへ渡す前にノイズを減らし、下流でのアラート疲れを抑えられます。更新の鮮度はキャッシュ制御とETag/Last-Modified(HTTPの条件付き取得)で担保し、Pull型の間引きではなく、可能ならPush/ストリーミングで遅延を詰めます¹⁰。設計段階でこの2点を明文化するだけで、運用のコストは大きく変わります。関連する設計の考え方は、たとえば SIEMコスト最適化の設計 や、検出ロジックに関係する シフトレフトの脅威モデリング にも直結します。

情報源の選定と接続:TAXII、MISP、OpenCTIを骨格にする

プロトコルとプラットフォームを標準で固めると、運用とベンダーロックの両方を避けやすくなります。TAXII 2.1はSTIX 2.1オブジェクトの配信に長け⁵⁶、MITRE ATT&CKや各種フィードの接続点として機能します⁷。MISP⁹¹¹はコミュニティ・ISAC連携の実務で使いやすく、イベント単位の整備がしやすいのが利点です。OpenCTI⁸はGraphQL APIで関係性を扱いやすく、キャンペーンとインフラ、マルウェアファミリの紐づけに強みがあります。以下では、それぞれを安全に、そして運用効率の観点で接続するコードを示します。

MITRE ATT&CK TAXII 2.1からの取得(Python)

import asyncio
from taxii2client.v21 import Server
from stix2 import MemoryStore

ATTACK_TAXII_URL = "https://cti-taxii.mitre.org/taxii/"

async def fetch_attack_enterprise():
    server = Server(ATTACK_TAXII_URL)
    api_root = server.api_roots[0]
    collections = {c.title: c for c in api_root.collections}
    enterprise = collections.get("Enterprise ATT&CK")
    ms = MemoryStore()
    # ページング対応
    for bundle in enterprise.get_objects():
        ms.add(bundle.get('objects', []))
    techniques = ms.query([{"type": "attack-pattern"}])
    print(f"techniques: {len(techniques)}")

if __name__ == "__main__":
    asyncio.run(fetch_attack_enterprise())

エラー時は429や5xxが返ることがあるため、ユーザーエージェント明示と指数バックオフを併用します。MemoryStoreで一旦集約し、必要なタイプだけ抽出すると後段の正規化が軽くなります。

OpenCTI GraphQLからインジケータ取得(Python)

import os
import requests

OPENCTI_URL = os.getenv("OPENCTI_URL")
OPENCTI_TOKEN = os.getenv("OPENCTI_TOKEN")

QUERY = {
  "query": """
  query Indicators($first: Int!, $orderBy: IndicatorsOrdering){
    indicators(first: $first, orderBy: $orderBy){
      edges{ node{ id indicator_value valid_from confidence pattern_type createdBy{ name } labels{ value } } }
    }
  }
  """,
  "variables": {"first": 200, "orderBy": "valid_from"}
}

headers = {"Authorization": f"Bearer {OPENCTI_TOKEN}", "Content-Type": "application/json"}

resp = requests.post(f"{OPENCTI_URL}/graphql", json=QUERY, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
for edge in data["data"]["indicators"]["edges"]:
    n = edge["node"]
    print(n["indicator_value"], n["confidence"])  # 正規化は後段で実施

GraphQLは選択フィールドを絞りやすく、ペイロードの軽量化に寄与します。無駄のない取得には、必要なフィールドのみ取得するポリシーが有効です。

MISPからイベントを取得して正規化(Python, PyMISP)

from pymisp import ExpandedPyMISP
import os

MISP_URL = os.getenv("MISP_URL")
MISP_KEY = os.getenv("MISP_KEY")
VERIFY_CERT = True

misp = ExpandedPyMISP(MISP_URL, MISP_KEY, VERIFY_CERT)

# タグでスコープを制限し、ノイズを抑制
events = misp.search(controller='events', tags=['tlp:green'], published=True, limit=100)

normalized = []
for e in events:
    for attr in e['Event'].get('Attribute', []):
        normalized.append({
            'type': attr.get('type'),
            'value': attr.get('value'),
            'source': 'misp',
            'event_id': e['Event']['id'],
            'tlp': [t['name'] for t in e['Event'].get('Tag', [])],
        })

print(f"normalized: {len(normalized)}")

MISPはタグ運用が品質に直結します。TLPの取り扱いを厳密にし、配信先ごとに許容レベルをマッピングすると、情報共有とコンプライアンスの両立が容易になります⁹¹¹。

収集パイプラインの実装:正規化、重複排除、配信の自動化

パイプラインは、取り込み、正規化、スコアリング、重複排除、配信、測定のロープで構成します。ストレージは書き捨てキャッシュと永続ストアを分離し、検索要件が強ければElasticsearchやOpenSearch、配信のファンアウトにはKafkaやSNS/SQSを使います。ここで鍵になるのは、IOCsを「タイプ、値、ソース、タイムスタンプ」の複合キーでユニーク化し、最小限のフィールドセットに落とす正規化です。

STIXインジケータの正規化と重複排除(Python)

from datetime import datetime
from hashlib import sha256

ESSENTIAL_FIELDS = ("type", "value", "source", "valid_from", "confidence", "labels")

def normalize_stix_indicator(obj: dict, source: str) -> dict:
    stype = obj.get("pattern_type") or obj.get("type")
    value = obj.get("indicator_value") or obj.get("value")
    valid_from = obj.get("valid_from") or obj.get("created")
    labels = [l["value"] for l in obj.get("labels", [])] if isinstance(obj.get("labels"), list) else obj.get("labels") or []
    confidence = obj.get("confidence") or 50
    return {
        "type": stype,
        "value": value,
        "source": source,
        "valid_from": valid_from,
        "confidence": confidence,
        "labels": labels,
    }

def dedup_key(doc: dict) -> str:
    base = f"{doc['type']}|{doc['value']}|{doc['source']}"
    return sha256(base.encode()).hexdigest()

# マージ方針:新しいvalid_fromを優先し、confidenceは最大値を保持
def merge(a: dict, b: dict) -> dict:
    newer = max(a.get("valid_from", ""), b.get("valid_from", ""))
    labels = sorted(set((a.get("labels") or []) + (b.get("labels") or [])))
    return {
        **a,
        **b,
        "valid_from": newer,
        "confidence": max(a.get("confidence", 0), b.get("confidence", 0)),
        "labels": labels,
    }

キーとマージ戦略を明文化すると、ソース追加のたびにバグを産まなくなります。confidenceの定義をチーム内で共有し、検出ロジックやブロックリストに適用する閾値を合わせると、運用の一貫性が高まります。

Kafkaへ配信し、消費側でSIEM/EDRへ分岐(Python, aiokafka)

import asyncio
import json
from aiokafka import AIOKafkaProducer

async def publish_to_kafka(records):
    producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
    await producer.start()
    try:
        for r in records:
            await producer.send_and_wait("threat.intel", json.dumps(r).encode())
    finally:
        await producer.stop()

if __name__ == "__main__":
    sample = [{"type": "ipv4-addr", "value": "203.0.113.5", "source": "misp", "valid_from": "2025-08-01T00:00:00Z", "confidence": 70, "labels": ["malicious"]}]
    asyncio.run(publish_to_kafka(sample))

消費側では、EDRのカスタムIOC機能や、SIEMのルックアップテーブルへ転送します。配信先ごとのフォーマットは早い段階で吸収し、ブローカーからの下流変換で対応すると、上流の変更が最小で済みます。ストリーミング配信は遅延を抑え、IOCの半減期に追従しやすくなります。

堅牢化:レート制御、リトライ、ETag/If-Modified-Since

import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))

etag_cache = {}

def get_with_cache(url):
    headers = {}
    if url in etag_cache:
        headers["If-None-Match"] = etag_cache[url]
    resp = session.get(url, headers=headers, timeout=30)
    if resp.status_code == 304:
        return None
    resp.raise_for_status()
    if 'ETag' in resp.headers:
        etag_cache[url] = resp.headers['ETag']
    return resp.json()

過剰取得の抑制は運用効率の要です。HTTPの基本機能を活用し、スケジューリングにはAirflowやArgo Workflowsを使うと、監査性と再現性が担保されます。イベント駆動の更新は、更新差分のみを配るソースで特に効果的です。ストリーミングでの設計は、トピックの保持期間と消費遅延をKPIとして管理しましょう。参考としてイベント基盤の設計は Kafkaのイベントストリーミング設計 が役立ちます。

活用面の最適化:検出精度、ノイズ削減、ROIの紐づけ

収集は手段であり、目的は検出力の向上と運用の省力化です。まずは検出面から見直します。IOCは単体でのブロックや一致検知に使いつつ、TTPはハントやルール生成に反映します。SigmaやYARAといった検知ルール言語や、各種クエリ言語に落とすとき、confidence閾値や初回観測時刻による重み付けを維持すれば、アラートの優先度付けがしやすくなります。

SIEMでの参照結合(Splunk SPL例)

| inputlookup threat_ioc.csv \
| where confidence >= 60 \
| rename value as ioc \
| eval ioc_type=type \
| join type=ioc_type [ search index=proxy sourcetype=zscaler \
  | eval ioc=coalesce(dest_ip, url) ] \
| stats count by ioc, src_ip, user, _time

参照はルックアップやKVストアで短周期に更新します。高頻度更新の負荷は、ルックアップをインメモリ化するか、サマリーデータモデルに近い形で緩和できます。

Microsoft Sentinelでの参照(KQL例)

let ioc=externaldata(value:string, type:string, confidence:int)
['https://storage.example.com/ioc.csv']
with(format='csv');
ProxyLogs
| where confidence >= 60
| extend indicator = iff(type == 'url', Url, tostring(DestinationIP))
| join kind=inner (ioc) on $left.indicator == $right.value
| summarize by value, User, Computer, TimeGenerated

クラウドSIEMではストレージのコスト最適化が効きます。低コストストレージに置いたIOCを必要時に参照し、頻出のスコア帯だけホット化すると、検出精度とコストのバランスが取りやすくなります。

ベンチマークとKPIの設計

取り込みから下流配信までのエンドツーエンド遅延、1秒あたりのインジケータ処理数、重複排除率、誤検知率、アラート到達までの時間を主要KPIに据えます。一般的に、非同期処理とバルク処理を併用すると、数千〜数万件規模のIOCでも短時間で正規化・配信しやすくなり、スキップ条件やETagを適用すると無駄な転送量を抑えられます。以下はローカル計測用の雛形です。

import time

def measure_throughput(process_fn, batch):
    start = time.time()
    processed = [process_fn(x) for x in batch]
    dt = time.time() - start
    tps = len(processed) / dt if dt else 0
    return tps, dt

# 使い方:tps, elapsed = measure_throughput(normalize_stix_indicator, sample_batch)

KPIは運用とビジネスを同じ言語にします。試算例として、アナリストの一日あたりの手作業30分短縮が月20営業日で約10時間、時給8,000円換算で月約8万円、年換算で約96万円に相当します。これをパイプラインの運用コストと比較し、パフォーマンス改善の優先度を決めます。より広い予算設計の考え方は、例えば セキュリティ投資のROI設計 が参考になります。

運用の「続ける仕組み」:ガバナンス、品質、レジリエンス

継続運用の肝は、ガバナンスとSLOの明確化です。信頼度の定義、許容できる誤検知率、更新遅延の上限、停止時のフォールバックをドキュメント化し、変更管理のPull Requestに紐づけます。障害に備えて、重要フィードは二重化し、片系が落ちてもキャッシュで24時間は持つようにします。インシデント時には、IOCの強制昇格や暫定ブロックを行う緊急ルートを設け、後から正規フローに畳み込みます。最後に、人のスキルとツールの関係です。アナリストがTTPを理解し、検出ロジックへ還元するループは自動化できません。自動化が稼いだ時間を、脅威ハンティングやプレイブック改善に投資できるよう、可視化ダッシュボードで時間配分を見える化すると効果が出やすくなります。

まとめ:速さと品質を両立させる現実解

攻撃の速度が増すほど、脅威情報の価値は鮮度と品質に依存します。標準プロトコルとオープンなプラットフォームで骨格を作り、正規化と重複排除を中核に据え、配信はストリーミングで遅延を抑える。信頼度とTLPの扱いを明確にし、SIEMやEDRではスコアに応じた優先度付けを徹底する。これだけで、検出精度と運用コストの両立は現実的になります。チームにとって最初の一歩は、今日扱っているフィードの棚卸しと、confidenceの定義合わせから始めるのが効果的です。その上で、小さなパイプラインを一つ動かし、KPIを測る。次に何を止め、何を強化するかが、データで見えるようになります。あなたの組織では、どのKPIから改善を始めますか。まずは既存のフィードにETag対応を加え、30日以内に重複排除率と遅延の実測をダッシュボード化してみてください。現場の負荷を下げつつ品質を高める、最短ルートになります。

参考文献

  1. Verizon. 2024 Data Breach Investigations Report – Newsroom summary. 2024. https://www.verizon.com/about/news/2024-data-breach-investigations-report-vulnerability-exploitation-boom
  2. Google Cloud Security (Mandiant). M‑Trends 2024: Key findings on median dwell time. 2024. https://cloud.google.com/blog/topics/threat-intelligence/m-trends-2024
  3. Dragos. End of life of an Indicator of Compromise (IOC). https://www.dragos.com/blog/end-of-life-of-an-indicator-of-compromise-ioc/
  4. IETF. RFC 9424: Operational Considerations for Cyber Threat Intelligence Indicators. https://www.ietf.org/rfc/rfc9424.html
  5. OASIS. TAXII Version 2.1. 2021. https://docs.oasis-open.org/cti/taxii/v2.1/taxii-v2.1.html
  6. OASIS. STIX Version 2.1. 2021. https://docs.oasis-open.org/cti/stix/v2.1/stix-v2.1.html
  7. MITRE ATT&CK. TAXII 2.1 Server. https://cti-taxii.mitre.org/taxii/
  8. OpenCTI Documentation. https://docs.opencti.io/
  9. MISP Project. Malware Information Sharing Platform (MISP). https://www.misp-project.org/
  10. IETF. RFC 9110: HTTP Semantics (ETag/If-Modified-Since). 2022. https://www.rfc-editor.org/rfc/rfc9110
  11. MyCERT. A threat intelligence platform for information sharing (MISP). https://www.mycert.org.my/portal/full?id=89df3ade-427f-4ab0-ba1f-219850d31c76