Article

APIを使わないシステム連携の実現方法

高田晃太郎
APIを使わないシステム連携の実現方法

API前提で連携計画を立てると、仕様不足や提供遅延でプロジェクトが止まることは珍しくありません。一般的な中規模の仮想マシンでも、CSV(テキスト形式の表)をSFTP(SSHによるセキュアなファイル転送)で扱う堅実な構成なら、毎時数十万行規模の安定転送は現実的であることが広く確認されています¹。つまり、APIがなくても要件次第で十分なスループットと信頼性を両立できるという事実です。業務改善を急ぐ現場では、ファイル連携やUI自動化(ヘッドレスブラウザやRPAの活用)、データベース経由の変更データキャプチャ(CDC: 変更の増分だけを取り出す手法)といった古典的な手段が、むしろ低コストで短期に効く場面が増えています²⁷。重要なのは、やみくもに繋ぐことではなく、可観測性(メトリクスやログで状態を見える化)と再実行性(失敗時に安全にやり直せる冪等性)を担保し、将来APIへ移行可能なアーキテクチャにしておくことです。

APIがなくても動く現実解を設計する

API非提供のSaaSやパッケージと向き合うとき、最初に決めるべきはデータの受け渡し面です。多くの実運用では、CSVなどのテキストファイルをSFTPで受け渡す方式が枯れており、到達保証を必要十分に満たします²⁸。転送前に暗号化(公開鍵での暗号化は鍵配布が簡単)を施し、受け側で検証してからアトミック(途中状態を見せない)に本番ディレクトリへ移すだけで、部分書き込みのリスクを避けられます。加えて、アプリ側に書き込み手段がない場合は、UI自動化でダウンロード・アップロードを代替できます。ヘッドレスブラウザの選択と選択子戦略(ARIAロールやテキスト優先)、レート制御(過剰アクセスを避ける)、トレーサビリティ(操作と結果を追跡可能に)という三点を丁寧に設計すれば、UI変更時の破断も可視化しやすくなります⁵⁶。さらに、DBに読み取り権限があるなら、スナップショット+差分取り込み、あるいは変更データキャプチャ(CDC)でファイル出力する選択肢も現実的です⁷。直接APIを叩かないという制約の中でも、データの整合性、重複排除、リトライの仕組みを先回りで入れておくことで、日次・時間次の連携を安定運用に乗せられます⁸。

SFTPファイル連携の最小実装

転送前にファイルサイズとチェックサム(内容の要約値)を記録し、受け側で検証してからファイル名を確定させる手順が基本になります。検証メタデータはJSONで添付し、HMAC(鍵付きハッシュ)で改ざん検出を行うと、監査にも強い形になります⁴。

# sftp_put.py
import json
import hashlib
import hmac
import os
from pathlib import Path
import paramiko

SECRET = os.environ.get("MANIFEST_SECRET", "change-me")

def sha256sum(p: Path) -> str:
    h = hashlib.sha256()
    with p.open('rb') as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b''):
            h.update(chunk)
    return h.hexdigest()

def build_manifest(p: Path) -> bytes:
    meta = {
        "filename": p.name,
        "size": p.stat().st_size,
        "sha256": sha256sum(p)
    }
    body = json.dumps(meta, separators=(',', ':'), ensure_ascii=False).encode()
    sig = hmac.new(SECRET.encode(), body, hashlib.sha256).hexdigest()
    envelope = {"meta": meta, "hmac": sig}
    return json.dumps(envelope, ensure_ascii=False).encode()

def sftp_put(host, user, key_path, local_path: Path, remote_dir: str):
    key = paramiko.Ed25519Key.from_private_key_file(key_path)
    transport = paramiko.Transport((host, 22))
    transport.connect(username=user, pkey=key)
    sftp = paramiko.SFTPClient.from_transport(transport)

    tmp_name = f".{local_path.name}.part"
    tmp_remote = f"{remote_dir}/{tmp_name}"
    final_remote = f"{remote_dir}/{local_path.name}"
    manifest_remote = f"{final_remote}.manifest.json"

    try:
        sftp.put(str(local_path), tmp_remote)
        sftp.rename(tmp_remote, final_remote)  # atomic publish
        mf = build_manifest(local_path)
        with sftp.open(manifest_remote, 'w') as f:
            f.write(mf.decode())
    finally:
        sftp.close()
        transport.close()

if __name__ == '__main__':
    sftp_put(
        host="sftp.partner.local",
        user="batch",
        key_path="/secrets/batch.key",
        local_path=Path("/data/outbound/orders_2025-08-30.csv"),
        remote_dir="/inbound/orders"
    )

転送前暗号化と鍵管理

転送路がSFTPでも、機微情報は受け側公開鍵で暗号化しておくと鍵管理がシンプルになります。GnuPG(PGP互換のオープンソース暗号ツール)での前処理は短いコマンドで実現できます³。

# ファイルを受け側の公開鍵で暗号化し、拡張子.gpgを付与
gpg --batch --yes --trust-model always \
    --output orders_2025-08-30.csv.gpg \
    --encrypt --recipient PARTNER_KEY_ID orders_2025-08-30.csv

壊れにくいUI自動化:安定化の勘所

UI自動化は壊れる、という先入観は正しい一面を持ちますが、安定化の余地も大きいのが実情です。データ取得であれば、URL直叩きや非公開の内部APIの使用ではなく、正式なエクスポート画面を経由してダウンロードをトリガーする設計に徹するだけで、規約違反のリスクとメンテ頻度を同時に下げられます。選択子はCSSの構造依存を避け、ARIAロールやテキストベースで指定し、ナビゲーションの待機はイベント駆動で行います⁵⁶。さらに、ヘルスチェック用の軽量フローを別途用意し、失敗したときに影響範囲を狭くできるようにしておくと、朝のバッチが止まっても代替経路で最低限の帳票だけを取りに行く判断が取りやすくなります。

# scrape_export.py (Playwright Python)
import asyncio
from playwright.async_api import async_playwright

async def export_csv(base_url: str, user: str, password: str, download_dir: str):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        ctx = await browser.new_context(accept_downloads=True)
        page = await ctx.new_page()
        await page.goto(base_url, wait_until="domcontentloaded")
        await page.get_by_label("Email").fill(user)
        await page.get_by_label("Password").fill(password)
        await page.get_by_role("button", name="Sign in").click()
        await page.wait_for_load_state("networkidle")

        await page.get_by_role("link", name="Reports").click()
        await page.get_by_role("button", name="Export CSV").click()

        with page.expect_download() as dl:
            await page.get_by_role("menuitem", name="Orders - last 24h").click()
        download = await dl.value
        path = await download.path()
        await download.save_as(f"{download_dir}/orders_latest.csv")
        await browser.close()

if __name__ == "__main__":
    asyncio.run(export_csv(
        base_url="https://console.vendor.example/login",
        user="robot@example.com",
        password="***",
        download_dir="/data/inbound"
    ))

耐久性を高めるには、待機の多用ではなく状態の検知が鍵になります。上の例では、ボタンのロール名とテキストに基づく選択を使い、ダウンロードはイベントで待ち受けています。これにより、微細なDOM構造の変更や遅延が発生しても、フローが過剰にタイムアウトしにくくなります⁵。また、1分あたりの操作イベントをむやみに増やすとレート制限(サービス側のアクセス制御)を誘発します。一般に、外部サービスと対話するUI自動化のスループットはCPUよりも相手側のスロットリングに支配されやすく、ページ遷移やダウンロードの回数設計がボトルネックになりがちです。

整合性・スキーマ・再実行性の三本柱

APIレス連携の本質的な難しさは、送受の間に契約が明文化されにくいことです。この不在を補うのが、ファイル単位の契約です。具体的には、スキーマ定義(列名・型・NULL許容・値域)、完全性検証(件数・ハッシュ・主キー重複検出)、重複排除、そして再実行に耐える命名と配置の取り決めが契約の代替になります。スキーマはマシンリーダブルな宣言として持ち運び、処理の各段階で検証します。完全性は件数とハッシュ、行単位の主キー重複検出で担保し、再実行性(冪等性)は「同一内容は同一ファイル名」によって実現します。これにより、リトライや再取り込みが発生しても、結果整合を保ちながら副作用を避けられます⁸。

# validate_and_stage.py
import csv
from pathlib import Path

REQUIRED_COLUMNS = ["order_id", "customer_id", "amount", "currency", "created_at"]

def validate_schema(p: Path) -> None:
    with p.open(newline='') as f:
        reader = csv.reader(f)
        header = next(reader)
        missing = [c for c in REQUIRED_COLUMNS if c not in header]
        if missing:
            raise ValueError(f"missing columns: {missing}")

        idx = {c: header.index(c) for c in REQUIRED_COLUMNS}
        seen = set()
        for i, row in enumerate(reader, start=2):
            oid = row[idx["order_id"]]
            if not oid:
                raise ValueError(f"row {i}: order_id is empty")
            if oid in seen:
                raise ValueError(f"row {i}: duplicate order_id {oid}")
            seen.add(oid)
            amt = float(row[idx["amount"]])
            if amt < 0:
                raise ValueError(f"row {i}: amount negative")
            cur = row[idx["currency"]]
            if cur not in {"JPY", "USD", "EUR"}:
                raise ValueError(f"row {i}: invalid currency {cur}")

if __name__ == '__main__':
    p = Path("/data/inbound/orders_latest.csv")
    validate_schema(p)
    p.replace(Path("/data/staging/") / p.name)

取り込み側では、ファイル名にハッシュや日付を含めることで冪等性を担保します。さらに、取り込み済みのハッシュを記録しておき、重複を即座にスキップできるようにすると、再実行での副作用を避けられます。

# ingest_idempotent.py
import hashlib
import sqlite3
from pathlib import Path

DB = ":memory:"  # 実運用では永続DBを使用

def file_hash(p: Path) -> str:
    h = hashlib.sha256()
    with p.open('rb') as f:
        for chunk in iter(lambda: f.read(1024*1024), b''):
            h.update(chunk)
    return h.hexdigest()

def mark_and_should_process(p: Path) -> bool:
    conn = sqlite3.connect(DB)
    conn.execute("CREATE TABLE IF NOT EXISTS ingested(hash TEXT PRIMARY KEY)")
    h = file_hash(p)
    try:
        conn.execute("INSERT INTO ingested(hash) VALUES (?)", (h,))
        conn.commit()
        return True
    except sqlite3.IntegrityError:
        return False

if __name__ == '__main__':
    target = Path("/data/staging/orders_latest.csv")
    if mark_and_should_process(target):
        print("process")
    else:
        print("skip duplicate")

DB経由とCDC:APIの外側にある正攻法

読み取り専用でDBにアクセスできるなら、スナップショット抽出と差分抽出をファイルに落とす設計は有効です。特に、PostgreSQLのロジカルデコーディングや商用DBのCDC機能を利用し、変更イベントを行単位で書き出してSFTPに流すと、近似リアルタイムな連携が可能になります⁷。HTTP APIは使いませんが、結果的にイベント駆動のデータパイプラインを構築できます⁷。一般的な構成でも、毎秒数百行規模の変更を継続処理できるケースが広く報告されており、ボトルネックはディスクI/Oや圧縮処理に寄ることが多い印象です⁷。

# PostgreSQL: ロジカルデコーディングスロットの作成
psql "host=db.example user=replicator dbname=app" \
  -c "SELECT * FROM pg_create_logical_replication_slot('app_slot', 'pgoutput');"

# 変更イベントをファイルに書き出し
pg_recvlogical -d "host=db.example user=replicator dbname=app" \
  --slot=app_slot --start -f - | gzip > changes.ndjson.gz

変更データの整形は、後段でJSON Linesに正規化しておくと扱いやすくなります。アプリケーション側のスキーマ進化に追随するには、列の追加に寛容な設計と、欠損列に対するデフォルト値の付与をパイプライン内で実施します。将来的に公式APIが整備された時点で、ソースをAPIに差し替えても、後段のスキーマ契約と冪等性の仕組みが同一であれば、移行の影響は最小限に抑えられます。

フォルダ監視でバッチをイベント化する

時間起動のバッチは運用が単純ですが、遅延が読めない場合はフォルダ監視で準リアルタイムのイベント処理に寄せると、滞留が減ります。watchdogなど軽量な監視で、ファイル到着時に検証・取り込みを即座に走らせる構成は、非APIでも体感速度を大きく改善します。

# watch_and_ingest.py
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from validate_and_stage import validate_schema

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        p = Path(event.src_path)
        if p.suffix == ".csv":
            try:
                validate_schema(p)
                (Path("/data/staging") / p.name).write_bytes(p.read_bytes())
                print(f"staged: {p.name}")
            except Exception as e:
                print(f"reject: {p.name}: {e}")

if __name__ == '__main__':
    obs = Observer()
    obs.schedule(Handler(), "/data/inbound", recursive=False)
    obs.start()
    try:
        while True:
            time.sleep(1)
    finally:
        obs.stop(); obs.join()

性能と運用:測って、観測して、やめ時を決める

APIレス連携は、性能が足りないという思い込みから過小評価されがちです。しかし、I/Oのパイプラインとして設計すれば、CPUよりもストレージとネットワークがボトルネックになります。圧縮とバッファリング、並列の使い方次第で、必要十分なスループットは確保できます。性能は推測でなく計測です。ローカルでCSVの読み書きや圧縮のレイテンシを計測し、どの段に最適化余地があるかを特定します。さらに、運用の可観測性を確保するために、各段の件数、サイズ、処理時間、ハッシュをメトリクスとして記録し、しきい値でアラートするだけでも、障害の検知速度は格段に上がります。API提供が始まったら移行する、というやめ時の定義も最初に決めておきます。SLA(合意されたサービス品質)や到達時間の期待値、監査要件が変わるタイミングで、段階的にソースを差し替える前提で設計しておくのが、投資回収の鍵です。

# quick_bench.py
import gzip, time
from pathlib import Path

src = Path("/data/staging/orders_latest.csv")
start = time.perf_counter()
raw = src.read_bytes()
read_s = time.perf_counter()
comp = gzip.compress(raw, compresslevel=6)
comp_s = time.perf_counter()
Path("/data/out/orders_latest.csv.gz").write_bytes(comp)
write_s = time.perf_counter()

print({
  "read_ms": int((read_s-start)*1000),
  "compress_ms": int((comp_s-read_s)*1000),
  "write_ms": int((write_s-comp_s)*1000),
  "bytes_in": len(raw),
  "bytes_out": len(comp)
})

この簡易計測でも、どの処理が支配的かが一目で分かります。ファイルI/Oの遅延が大きいならバッファサイズやストレージ階層の見直し、圧縮が支配的ならレベルを5〜6に下げる、あるいは並列化する、といった意思決定が可能になります。また、UI自動化の側は、1ジョブあたりの最長許容時間や日次の成功率といったSLO(目標とするサービスレベル)を明確にし、しきい値に基づくアラートで人が起きるべきかどうかを判断できます。ここまで観測できていれば、API提供への移行判断も定量化され、投資の正当化がしやすくなります。

セキュリティと監査の最小ライン

転送はSFTP/FTPS/AS2(いずれも暗号化や署名を備えた安全な転送方式)など暗号化された経路を使い、ファイル自体も公開鍵暗号で包むと取り扱いが楽になります³⁹。監査の観点では、誰がいつ何件のデータを入出力したか、ハッシュは何だったか、障害時に何をリトライしたかを追えるログとメタデータが要です。アプリログとは別に、連携パイプラインの操作ログを保存期間付きで保管し、個人情報マスキングの方針も含めて運用設計に織り込んでおくと、監査対応の工数が大きく減らせます⁹。

まとめ:API待ちをやめ、移行前提で出荷する

APIがなくても、業務改善は十分に実現できます。CSV+SFTPの堅実なパイプライン、壊れにくいUI自動化、DBのCDCといった現実解を、スキーマ契約、完全性検証、冪等設計という土台に載せるだけで、日次・時間次の連携は安定運用に乗ります。重要なのは、今の制約で最良の仕組みを動かしつつ、将来APIに差し替えられるように境界を設計しておくことです。あなたの現場では、どのデータから連携を始めれば最も早くビジネスインパクトが出るでしょうか。まずは最小のデータセットでパイプラインを作り、性能と運用コストを実測し、移行のやめ時を決めるためのダッシュボードを用意してみてください。最短で価値を出し、最適なタイミングで置き換えるという戦い方が、APIレス時代の正攻法です。

参考文献

  1. AWS Transfer Family – SFTP コネクタのパフォーマンス強化(2024/09)
  2. Integrate.io. SFTPのデータ統合プロセス(日本語)
  3. AWS Storage Blog. Architecting secure and compliant managed file transfers with AWS Transfer Family, SFTP connectors, and PGP encryption.
  4. JSCAPE Blog. What is HMAC and how does it secure file transfers?
  5. Playwright Documentation. Actionability.
  6. Devbookmarks. Playwright testing guide — Prefer User-facing selectors over XPath or CSS.
  7. Debezium Documentation. PostgreSQL connector and logical decoding.
  8. Skyvia Blog. Understanding SFTP Automation — ensuring integrity and reducing manual work.
  9. Progress(Japan)Blog. どのセキュアファイル転送(MFT)ソリューションが最適か.