MCP コンセプト : トランスポート – 標準入出力 & SSE

MCP の通信メカニズムについて学習します。Model Context Protocol (MCP) のトランスポートはクライアントとサーバ間の通信の基盤を提供します。トランスポートはメッセージの送受信の方法の基礎的なメカニズムを処理します。

Model Context Protocol (MCP) : コンセプト : トランスポート – 標準入出力 & SSE

作成 : クラスキャット・セールスインフォメーション
作成日時 : 05/28/2025

* 本記事は github modelcontextprotocol の以下のページを独自に翻訳した上でまとめ直し、補足説明を加えています :

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

クラスキャット 人工知能 研究開発支援サービス ⭐️ リニューアルしました 😉

クラスキャット は人工知能に関する各種サービスを提供しています。お気軽にご相談ください :

  • 人工知能導入個別相談会(無償)実施中! [詳細]

  • 人工知能研究開発支援 [詳細]
    1. 自社特有情報を含むチャットボット構築支援
    2. 画像認識 (医療系含む) / 画像生成

  • PoC(概念実証)を失敗させないための支援 [詳細]

お問合せ : 下記までお願いします。

  • クラスキャット セールス・インフォメーション
  • sales-info@classcat.com
  • ClassCatJP

 

 

Model Context Protocol (MCP) : コンセプト : トランスポート – 標準入出力 & SSE

MCP の通信メカニズムについて学習します

Model Context Protocol (MCP) のトランスポートはクライアントとサーバ間の通信の基盤を提供します。トランスポートはメッセージの送受信の方法の基礎的なメカニズムを処理します。

 

メッセージ形式

MCP はその伝送形式 (wire format) として JSON-RPC 2.0 を使用します。トランスポート層は、送信用に MCP プロトコルメッセージを JSON-RPC 形式に変換し、受信した JSON-RPC メッセージを MCP プロトコルメッセージに再変換する役割を担います。

使用される JSON-RPC メッセージには 3 つの種類があります :

 

リクエスト

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

 

レスポンス

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

 

通知 (Notifications)

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

 

組み込みトランスポート・タイプ

MCP は 2 つの標準トランスポート実装を含みます :

 

標準入出力 (stdio)

stdio トランスポートは標準入出力ストリーム経由の通信を可能にします。これはローカル統合やコマンドライン・ツールに特に有用です。

以下の場合に stdio を使用します :

  • コマンドライン・ツールの構築
  • ローカル統合の実装
  • 単純なプロセス通信が必要な場合
  • シェルスクリプトで操作


TypeScript (サーバ)

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioServerTransport();
await server.connect(transport);

TypeScript (クライアント)

const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new StdioClientTransport({
  command: "./server",
  args: ["--option", "value"]
});
await client.connect(transport);

Python (サーバ)

app = Server("example-server")

async with stdio_server() as streams:
    await app.run(
        streams[0],
        streams[1],
        app.create_initialization_options()
    )

Python (クライアント)

params = StdioServerParameters(
    command="./server",
    args=["--option", "value"]
)

async with stdio_client(params) as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

 

Server-Sent Events (SSE)

SSE トランスポートは、クライアント-to-サーバ通信において、HTTP POST リクエストを使用してサーバ-to-クライアント・ストリーミングを可能にします。

以下の場合に SSE を使用します :

  • サーバ-to-クライアント・ストリーミングだけが必要な場合
  • 制限されたネットワークで作業
  • 単純なアップデートの実装

 

セキュリティ警告: DNS リバインディング攻撃

SSE トランスポートは、適切に保護されていない場合、DNS リバインディング攻撃への脆弱性になり得ます。これを防ぐには :

  1. 受信する SSE 接続の Origin ヘッダを常に検証 し、想定されるソースからのものであることを確認してください。

  2. ローカルで実装している場合 サーバをすべてのネットワーク・インターフェイス (0.0.0.0) にバインドすることを避けてください – 代わりに (127.0.0.1) にのみバインドします。

  3. すべての SSE 接続に対して 適切な認証を実装してください

これらの保護なしでは、攻撃者は DNS リバインディングを使用して、リモート web サイトからローカル MCP サーバを操作できる可能性があります。

TypeScript (サーバ)

import express from "express";

const app = express();

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

app.listen(3000);

TypeScript (クライアント)

const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);

Python (サーバ)

from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

app = Server("example-server")
sse = SseServerTransport("/messages")

async def handle_sse(scope, receive, send):
    async with sse.connect_sse(scope, receive, send) as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

async def handle_messages(scope, receive, send):
    await sse.handle_post_message(scope, receive, send)

starlette_app = Starlette(
    routes=[
        Route("/sse", endpoint=handle_sse),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ]
)

Python (クライアント)

async with sse_client("http://localhost:8000/sse") as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

 

カスタム・トランスポート

MCP は特定のニーズに合わせたカスタムトランスポートを実装することを容易にします。任意のトランスポート実装は Transport インターフェイスに準拠する必要があるだけです :

以下のためにカスタムトランスポートを実装できます :

  • カスタム・ネットワーク・プロトコル
  • 専用通信チャネル
  • 既存のシステムとの統合
  • パフォーマンス最適化

TypeScript

interface Transport {
  // Start processing messages
  start(): Promise;

  // Send a JSON-RPC message
  send(message: JSONRPCMessage): Promise;

  // Close the connection
  close(): Promise;

  // Callbacks
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
}

Python

@contextmanager
async def create_transport(
    read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
    write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
    """
    Transport interface for MCP.

    Args:
        read_stream: Stream to read incoming messages from
        write_stream: Stream to write outgoing messages to
    """
    async with anyio.create_task_group() as tg:
        try:
            # Start processing messages
            tg.start_soon(lambda: process_messages(read_stream))

            # Send messages
            async with write_stream:
                yield write_stream

        except Exception as exc:
            # Handle errors
            raise exc
        finally:
            # Clean up
            tg.cancel_scope.cancel()
            await write_stream.aclose()
            await read_stream.aclose()

Note : While MCP Servers are often implemented with asyncio, we recommend implementing low-level interfaces like transports with anyio for wider compatibility.

 

エラー処理

トランスポート実装は様々なエラーのシナリオを処理する必要があります :

  • 接続エラー
  • メッセージ解析エラー
  • プロトコル・エラー
  • ネットワーク・タイムアウト
  • リソース・クリーンアップ

Example error handling:

TypeScript

class ExampleTransport implements Transport {
  async start() {
    try {
      // Connection logic
    } catch (error) {
      this.onerror?.(new Error(`Failed to connect: ${error}`));
      throw error;
    }
  }

  async send(message: JSONRPCMessage) {
    try {
      // Sending logic
    } catch (error) {
      this.onerror?.(new Error(`Failed to send message: ${error}`));
      throw error;
    }
  }
}

Python

@contextmanager
async def example_transport(scope: Scope, receive: Receive, send: Send):
    try:
        # Create streams for bidirectional communication
        read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
        write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

        async def message_handler():
            try:
                async with read_stream_writer:
                    # Message handling logic
                    pass
            except Exception as exc:
                logger.error(f"Failed to handle message: {exc}")
                raise exc

        async with anyio.create_task_group() as tg:
            tg.start_soon(message_handler)
            try:
                # Yield streams for communication
                yield read_stream, write_stream
            except Exception as exc:
                logger.error(f"Transport error: {exc}")
                raise exc
            finally:
                tg.cancel_scope.cancel()
                await write_stream.aclose()
                await read_stream.aclose()
    except Exception as exc:
        logger.error(f"Failed to initialize transport: {exc}")
        raise exc

Note : While MCP Servers are often implemented with asyncio, we recommend implementing low-level interfaces like transports with anyio for wider compatibility.

 

以上