最新の脅威情報を効率的に収集する方法
統計によると、Verizon DBIR 2024 は侵害の約68%に人的要素が関与すると報告し¹、Mandiant(Google Cloud)の年次レポート M‑Trends 2024 では潜伏期間(dwell time)の中央値が約10日まで短縮したとされます²。脅威アクターのサイクルが短くなるほど、古いインジケータは価値を失い、更新頻度と正確性が防御力を左右します。公開レポートやRFCが示す傾向として、IOC(Indicator of Compromise: 侵害の痕跡)の有効期間は数時間から数日単位で劣化しやすく³⁴、単純なRSS購読やメール転送では取りこぼしとノイズが積み上がります。そこで本稿では、CTOやエンジニアリーダーが運用可能な現実解として、標準プロトコルのTAXII/STIX⁵⁶(TAXIIは配信プロトコル、STIXは表現フォーマット)を核に、MISP⁹¹¹やOpenCTI⁸といったプラットフォームを組み合わせ、収集・正規化・重複排除・信頼度評価・配信までを最適化するアーキテクチャと実装を、セキュリティ運用のKPIに結び付ける視点で解説します。
なぜ「集める前に設計」が効くのか:ノイズと鮮度の経済学
脅威情報の運用効率を語る前に、何を集めるのかを定義することが最重要です。一次情報としてはTAXIIで配布されるSTIXオブジェクト⁵⁶(標準化された脅威情報)、あるいはコミュニティ主導のMISPイベント⁹¹¹が挙げられます。二次情報としてはブログ、脆弱性の脅威優先度解説、レピュテーションのスコアなどがあります。設計の要点は、IOC中心の高速ループと、戦術・技術・手順(TTP)中心の低速ループを分けることにあります。前者はDNSやプロキシ、EDRでのブロックや検知に直結し、後者は検出ロジックやハントクエリの更新に効きます。両者を同一バスで扱うと、更新周期が衝突しスループットを落とします。
さらに重要なのは、重複排除と信頼度の早期付与です。同じIPアドレスが複数のソースから届いたとき、最初に到着した情報が必ずしも最良とは限りません。値とタイプに加え、source、observed_time、valid_from、confidenceをキー化してマージする方針を最初から持ち込み、スコアは「ソースの信用」「観測の新しさ」「過去の誤検知率」などの重みに分解します。これにより、SIEMやEDRへ渡す前にノイズを減らし、下流でのアラート疲れを抑えられます。更新の鮮度はキャッシュ制御とETag/Last-Modified(HTTPの条件付き取得)で担保し、Pull型の間引きではなく、可能ならPush/ストリーミングで遅延を詰めます¹⁰。設計段階でこの2点を明文化するだけで、運用のコストは大きく変わります。関連する設計の考え方は、たとえば SIEMコスト最適化の設計 や、検出ロジックに関係する シフトレフトの脅威モデリング にも直結します。
情報源の選定と接続:TAXII、MISP、OpenCTIを骨格にする
プロトコルとプラットフォームを標準で固めると、運用とベンダーロックの両方を避けやすくなります。TAXII 2.1はSTIX 2.1オブジェクトの配信に長け⁵⁶、MITRE ATT&CKや各種フィードの接続点として機能します⁷。MISP⁹¹¹はコミュニティ・ISAC連携の実務で使いやすく、イベント単位の整備がしやすいのが利点です。OpenCTI⁸はGraphQL APIで関係性を扱いやすく、キャンペーンとインフラ、マルウェアファミリの紐づけに強みがあります。以下では、それぞれを安全に、そして運用効率の観点で接続するコードを示します。
MITRE ATT&CK TAXII 2.1からの取得(Python)
import asyncio
from taxii2client.v21 import Server
from stix2 import MemoryStore
ATTACK_TAXII_URL = "https://cti-taxii.mitre.org/taxii/"
async def fetch_attack_enterprise():
server = Server(ATTACK_TAXII_URL)
api_root = server.api_roots[0]
collections = {c.title: c for c in api_root.collections}
enterprise = collections.get("Enterprise ATT&CK")
ms = MemoryStore()
# ページング対応
for bundle in enterprise.get_objects():
ms.add(bundle.get('objects', []))
techniques = ms.query([{"type": "attack-pattern"}])
print(f"techniques: {len(techniques)}")
if __name__ == "__main__":
asyncio.run(fetch_attack_enterprise())
エラー時は429や5xxが返ることがあるため、ユーザーエージェント明示と指数バックオフを併用します。MemoryStoreで一旦集約し、必要なタイプだけ抽出すると後段の正規化が軽くなります。
OpenCTI GraphQLからインジケータ取得(Python)
import os
import requests
OPENCTI_URL = os.getenv("OPENCTI_URL")
OPENCTI_TOKEN = os.getenv("OPENCTI_TOKEN")
QUERY = {
"query": """
query Indicators($first: Int!, $orderBy: IndicatorsOrdering){
indicators(first: $first, orderBy: $orderBy){
edges{ node{ id indicator_value valid_from confidence pattern_type createdBy{ name } labels{ value } } }
}
}
""",
"variables": {"first": 200, "orderBy": "valid_from"}
}
headers = {"Authorization": f"Bearer {OPENCTI_TOKEN}", "Content-Type": "application/json"}
resp = requests.post(f"{OPENCTI_URL}/graphql", json=QUERY, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
for edge in data["data"]["indicators"]["edges"]:
n = edge["node"]
print(n["indicator_value"], n["confidence"]) # 正規化は後段で実施
GraphQLは選択フィールドを絞りやすく、ペイロードの軽量化に寄与します。無駄のない取得には、必要なフィールドのみ取得するポリシーが有効です。
MISPからイベントを取得して正規化(Python, PyMISP)
from pymisp import ExpandedPyMISP
import os
MISP_URL = os.getenv("MISP_URL")
MISP_KEY = os.getenv("MISP_KEY")
VERIFY_CERT = True
misp = ExpandedPyMISP(MISP_URL, MISP_KEY, VERIFY_CERT)
# タグでスコープを制限し、ノイズを抑制
events = misp.search(controller='events', tags=['tlp:green'], published=True, limit=100)
normalized = []
for e in events:
for attr in e['Event'].get('Attribute', []):
normalized.append({
'type': attr.get('type'),
'value': attr.get('value'),
'source': 'misp',
'event_id': e['Event']['id'],
'tlp': [t['name'] for t in e['Event'].get('Tag', [])],
})
print(f"normalized: {len(normalized)}")
MISPはタグ運用が品質に直結します。TLPの取り扱いを厳密にし、配信先ごとに許容レベルをマッピングすると、情報共有とコンプライアンスの両立が容易になります⁹¹¹。
収集パイプラインの実装:正規化、重複排除、配信の自動化
パイプラインは、取り込み、正規化、スコアリング、重複排除、配信、測定のロープで構成します。ストレージは書き捨てキャッシュと永続ストアを分離し、検索要件が強ければElasticsearchやOpenSearch、配信のファンアウトにはKafkaやSNS/SQSを使います。ここで鍵になるのは、IOCsを「タイプ、値、ソース、タイムスタンプ」の複合キーでユニーク化し、最小限のフィールドセットに落とす正規化です。
STIXインジケータの正規化と重複排除(Python)
from datetime import datetime
from hashlib import sha256
ESSENTIAL_FIELDS = ("type", "value", "source", "valid_from", "confidence", "labels")
def normalize_stix_indicator(obj: dict, source: str) -> dict:
stype = obj.get("pattern_type") or obj.get("type")
value = obj.get("indicator_value") or obj.get("value")
valid_from = obj.get("valid_from") or obj.get("created")
labels = [l["value"] for l in obj.get("labels", [])] if isinstance(obj.get("labels"), list) else obj.get("labels") or []
confidence = obj.get("confidence") or 50
return {
"type": stype,
"value": value,
"source": source,
"valid_from": valid_from,
"confidence": confidence,
"labels": labels,
}
def dedup_key(doc: dict) -> str:
base = f"{doc['type']}|{doc['value']}|{doc['source']}"
return sha256(base.encode()).hexdigest()
# マージ方針:新しいvalid_fromを優先し、confidenceは最大値を保持
def merge(a: dict, b: dict) -> dict:
newer = max(a.get("valid_from", ""), b.get("valid_from", ""))
labels = sorted(set((a.get("labels") or []) + (b.get("labels") or [])))
return {
**a,
**b,
"valid_from": newer,
"confidence": max(a.get("confidence", 0), b.get("confidence", 0)),
"labels": labels,
}
キーとマージ戦略を明文化すると、ソース追加のたびにバグを産まなくなります。confidenceの定義をチーム内で共有し、検出ロジックやブロックリストに適用する閾値を合わせると、運用の一貫性が高まります。
Kafkaへ配信し、消費側でSIEM/EDRへ分岐(Python, aiokafka)
import asyncio
import json
from aiokafka import AIOKafkaProducer
async def publish_to_kafka(records):
producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
await producer.start()
try:
for r in records:
await producer.send_and_wait("threat.intel", json.dumps(r).encode())
finally:
await producer.stop()
if __name__ == "__main__":
sample = [{"type": "ipv4-addr", "value": "203.0.113.5", "source": "misp", "valid_from": "2025-08-01T00:00:00Z", "confidence": 70, "labels": ["malicious"]}]
asyncio.run(publish_to_kafka(sample))
消費側では、EDRのカスタムIOC機能や、SIEMのルックアップテーブルへ転送します。配信先ごとのフォーマットは早い段階で吸収し、ブローカーからの下流変換で対応すると、上流の変更が最小で済みます。ストリーミング配信は遅延を抑え、IOCの半減期に追従しやすくなります。
堅牢化:レート制御、リトライ、ETag/If-Modified-Since
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504])
session.mount("https://", HTTPAdapter(max_retries=retries))
etag_cache = {}
def get_with_cache(url):
headers = {}
if url in etag_cache:
headers["If-None-Match"] = etag_cache[url]
resp = session.get(url, headers=headers, timeout=30)
if resp.status_code == 304:
return None
resp.raise_for_status()
if 'ETag' in resp.headers:
etag_cache[url] = resp.headers['ETag']
return resp.json()
過剰取得の抑制は運用効率の要です。HTTPの基本機能を活用し、スケジューリングにはAirflowやArgo Workflowsを使うと、監査性と再現性が担保されます。イベント駆動の更新は、更新差分のみを配るソースで特に効果的です。ストリーミングでの設計は、トピックの保持期間と消費遅延をKPIとして管理しましょう。参考としてイベント基盤の設計は Kafkaのイベントストリーミング設計 が役立ちます。
活用面の最適化:検出精度、ノイズ削減、ROIの紐づけ
収集は手段であり、目的は検出力の向上と運用の省力化です。まずは検出面から見直します。IOCは単体でのブロックや一致検知に使いつつ、TTPはハントやルール生成に反映します。SigmaやYARAといった検知ルール言語や、各種クエリ言語に落とすとき、confidence閾値や初回観測時刻による重み付けを維持すれば、アラートの優先度付けがしやすくなります。
SIEMでの参照結合(Splunk SPL例)
| inputlookup threat_ioc.csv \
| where confidence >= 60 \
| rename value as ioc \
| eval ioc_type=type \
| join type=ioc_type [ search index=proxy sourcetype=zscaler \
| eval ioc=coalesce(dest_ip, url) ] \
| stats count by ioc, src_ip, user, _time
参照はルックアップやKVストアで短周期に更新します。高頻度更新の負荷は、ルックアップをインメモリ化するか、サマリーデータモデルに近い形で緩和できます。
Microsoft Sentinelでの参照(KQL例)
let ioc=externaldata(value:string, type:string, confidence:int)
['https://storage.example.com/ioc.csv']
with(format='csv');
ProxyLogs
| where confidence >= 60
| extend indicator = iff(type == 'url', Url, tostring(DestinationIP))
| join kind=inner (ioc) on $left.indicator == $right.value
| summarize by value, User, Computer, TimeGenerated
クラウドSIEMではストレージのコスト最適化が効きます。低コストストレージに置いたIOCを必要時に参照し、頻出のスコア帯だけホット化すると、検出精度とコストのバランスが取りやすくなります。
ベンチマークとKPIの設計
取り込みから下流配信までのエンドツーエンド遅延、1秒あたりのインジケータ処理数、重複排除率、誤検知率、アラート到達までの時間を主要KPIに据えます。一般的に、非同期処理とバルク処理を併用すると、数千〜数万件規模のIOCでも短時間で正規化・配信しやすくなり、スキップ条件やETagを適用すると無駄な転送量を抑えられます。以下はローカル計測用の雛形です。
import time
def measure_throughput(process_fn, batch):
start = time.time()
processed = [process_fn(x) for x in batch]
dt = time.time() - start
tps = len(processed) / dt if dt else 0
return tps, dt
# 使い方:tps, elapsed = measure_throughput(normalize_stix_indicator, sample_batch)
KPIは運用とビジネスを同じ言語にします。試算例として、アナリストの一日あたりの手作業30分短縮が月20営業日で約10時間、時給8,000円換算で月約8万円、年換算で約96万円に相当します。これをパイプラインの運用コストと比較し、パフォーマンス改善の優先度を決めます。より広い予算設計の考え方は、例えば セキュリティ投資のROI設計 が参考になります。
運用の「続ける仕組み」:ガバナンス、品質、レジリエンス
継続運用の肝は、ガバナンスとSLOの明確化です。信頼度の定義、許容できる誤検知率、更新遅延の上限、停止時のフォールバックをドキュメント化し、変更管理のPull Requestに紐づけます。障害に備えて、重要フィードは二重化し、片系が落ちてもキャッシュで24時間は持つようにします。インシデント時には、IOCの強制昇格や暫定ブロックを行う緊急ルートを設け、後から正規フローに畳み込みます。最後に、人のスキルとツールの関係です。アナリストがTTPを理解し、検出ロジックへ還元するループは自動化できません。自動化が稼いだ時間を、脅威ハンティングやプレイブック改善に投資できるよう、可視化ダッシュボードで時間配分を見える化すると効果が出やすくなります。
まとめ:速さと品質を両立させる現実解
攻撃の速度が増すほど、脅威情報の価値は鮮度と品質に依存します。標準プロトコルとオープンなプラットフォームで骨格を作り、正規化と重複排除を中核に据え、配信はストリーミングで遅延を抑える。信頼度とTLPの扱いを明確にし、SIEMやEDRではスコアに応じた優先度付けを徹底する。これだけで、検出精度と運用コストの両立は現実的になります。チームにとって最初の一歩は、今日扱っているフィードの棚卸しと、confidenceの定義合わせから始めるのが効果的です。その上で、小さなパイプラインを一つ動かし、KPIを測る。次に何を止め、何を強化するかが、データで見えるようになります。あなたの組織では、どのKPIから改善を始めますか。まずは既存のフィードにETag対応を加え、30日以内に重複排除率と遅延の実測をダッシュボード化してみてください。現場の負荷を下げつつ品質を高める、最短ルートになります。
参考文献
- Verizon. 2024 Data Breach Investigations Report – Newsroom summary. 2024. https://www.verizon.com/about/news/2024-data-breach-investigations-report-vulnerability-exploitation-boom
- Google Cloud Security (Mandiant). M‑Trends 2024: Key findings on median dwell time. 2024. https://cloud.google.com/blog/topics/threat-intelligence/m-trends-2024
- Dragos. End of life of an Indicator of Compromise (IOC). https://www.dragos.com/blog/end-of-life-of-an-indicator-of-compromise-ioc/
- IETF. RFC 9424: Operational Considerations for Cyber Threat Intelligence Indicators. https://www.ietf.org/rfc/rfc9424.html
- OASIS. TAXII Version 2.1. 2021. https://docs.oasis-open.org/cti/taxii/v2.1/taxii-v2.1.html
- OASIS. STIX Version 2.1. 2021. https://docs.oasis-open.org/cti/stix/v2.1/stix-v2.1.html
- MITRE ATT&CK. TAXII 2.1 Server. https://cti-taxii.mitre.org/taxii/
- OpenCTI Documentation. https://docs.opencti.io/
- MISP Project. Malware Information Sharing Platform (MISP). https://www.misp-project.org/
- IETF. RFC 9110: HTTP Semantics (ETag/If-Modified-Since). 2022. https://www.rfc-editor.org/rfc/rfc9110
- MyCERT. A threat intelligence platform for information sharing (MISP). https://www.mycert.org.my/portal/full?id=89df3ade-427f-4ab0-ba1f-219850d31c76