WebRTC DataChannel と OPFS と sqlite-wasm を組み合わせた P2P SNS 試作 その1
概要
P2P で動くSNS Webアプリが作ってみたくなって、ChatGPT と相談しながら作ってみた。
WebRTC DataChannel で P2P っぽい部分を実現し、データは完全にローカル環境の OPFS にしかない。
ストレージには SQLite を使った。WASM 版があるのでそれを OPFS に組み込んだ。
Cloudflare の Realtime に無料枠が1000GB/月の TURN サーバーがあるのでそれを使っている。
作ってみたらリアルタイムのチャットみたいな雰囲気になったが、ルームは一つしか無く投稿したら接続中の人全員に問答無用で公開されるし、
「再送」ボタンを押すと自分が持ってるポストを接続中の人にブロードキャストされる(はず)。
作ったもの
同時に接続してて投稿している人がいないと、他人の投稿は見えない仕組み。
やったこと
WebSocket でシグナリングサーバーを作る
まずやらないといけなかったのは、WebSocket でシグナリングサーバーを作るところだった。 調べたら Cloudflare Workers でも WebSocket が使えるようなので無料で構築。
簡単だろうと思っていたら案外ハマりどころが多かった。主に Cloudflare Workers 関連で。
ハマりどころ1: WebSocket の API が Node.js と違う
Workers は JS のランタイムとして Workerd という Node.js とも Deno とも Bun とも違う処理系を独自に開発して使っている。
そのためところどころ Node.js の API が使えなかったり違ってたりする。
WebSocket 関連もそうで、巷にある Node.js 用の WebSocket サーバーのサンプルコードは使えない。
代わりに以下のようなコードを書く必要がある。
const webSocketPair = new WebSocketPair(); const [client, server] = Object.values(webSocketPair);
ここで返ってくる server は ws パッケージの WebSocket みたいに使える。
……のだが、ws では wss.clients みたいに取得できていた、接続中のクライアント一覧はそのままでは取得できないので、工夫する必要がある(後述)。
また、ここで client が取得できるから、 ws みたいに client.send() を呼んでデータが送れるかと思ったら送れない。
Workers の場合は server.send() で送れるようになる。ややこしい。
ハマりどころ2: 接続中のクライアント一覧を取得できない
Workerd の API にある WebSocketPair() で取得できるのはクライアントとサーバーの WebSocket オブジェクト2つのみである。
現在接続中のクライアント一覧はどうやって取得するのか? どうやってイベントをブロードキャストするのか?
これを実現するためには、公式サンプルにあるように接続中のクライアントを保持する変数を宣言して保持しておくというやり方で実装することになる。

下記の sessions で接続中のクライアントを保持できる。
export class WebSocketServer extends DurableObject { // Keeps track of all WebSocket connections sessions: Map<WebSocket, { [key: string]: string }>; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); this.sessions = new Map(); } async fetch(request: Request): Promise<Response> { // Creates two ends of a WebSocket connection. const webSocketPair = new WebSocketPair(); const [client, server] = Object.values(webSocketPair); // Calling `accept()` tells the runtime that this WebSocket is to begin terminating // request within the Durable Object. It has the effect of "accepting" the connection, // and allowing the WebSocket to send and receive messages. server.accept(); // Generate a random UUID for the session. const id = crypto.randomUUID(); // Add the WebSocket connection to the map of active sessions. this.sessions.set(server, { id }); server.addEventListener('message', (event) => { this.handleWebSocketMessage(server, event.data); }); // If the client closes the connection, the runtime will close the connection too. server.addEventListener('close', () => { this.handleConnectionClose(server); }); return new Response(null, { status: 101, webSocket: client, }); } async handleWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { const connection = this.sessions.get(ws)!; // Reply back with the same message to the connection ws.send(`[Durable Object] message: ${message}, from: ${connection.id}, to: the initiating client. Total connections: ${this.sessions.size}`); // Broadcast the message to all the connections, // except the one that sent the message. this.sessions.forEach((_, session) => { if (session !== ws) { session.send(`[Durable Object] message: ${message}, from: ${connection.id}, to: all clients except the initiating client. Total connections: ${this.sessions.size}`); } }); // Broadcast the message to all the connections, // including the one that sent the message. this.sessions.forEach((_, session) => { session.send(`[Durable Object] message: ${message}, from: ${connection.id}, to: all clients. Total connections: ${this.sessions.size}`); }); } async handleConnectionClose(ws: WebSocket) { this.sessions.delete(ws); ws.close(1000, 'Durable Object is closing WebSocket'); } }
WebSocketServer が継承している DurableObject については次で説明する。
ハマりどころ3: WebSocket が繋がらない
考えてみれば当たり前なのだが、Workers はサーバーレス実行環境であるため、複数人がアクセスしたら別々のサーバー上で動いている可能性が高い。 サーバーとクライアントがリアルタイムでイベントを送り合うだけならそのまま Workers 上で実装しても問題ないのだが、 今回やりたいのは WebRTC のシグナリングである。クライアントは最低2人以上いる。このままだと2人のクライアントを相互に通信させられない。
これを解決するのが、 Durable Objects という仕組みだった。

Durable には「丈夫な」とかの他に「恒久の」とか「永続性のある」という意味があるらしい。多分この場合は「恒久」「永続性」のほうだろう。
軽くドキュメントを読んだ感じだと、オブジェクトをグローバルに単一のものとして扱うことができるようになる仕組みのようだ。 普通の VM にデプロイしたサーバーサイドアプリがメモリ内で保持している static オブジェクトみたいなものだろう。 各クライアントのリクエストをまたいで同じ状態を持てるもの、と考えれば良さそうだ。
Use Durable Objects to build applications that need coordination among multiple clients, like collaborative editing tools, interactive chat, multiplayer games, live notifications, and deep distributed systems, without requiring you to build serialization and coordination primitives on your own.
(Google翻訳)Durable Objects を使用すると、共同編集ツール、インタラクティブ チャット、マルチプレイヤー ゲーム、ライブ通知、ディープ分散システムなど、複数のクライアント間の調整を必要とするアプリケーションを構築できます。シリアル化や調整のプリミティブを独自に構築する必要はありません。
と説明されているので、まさに今回のような目的で使えるように作られた仕組みだと言える。
具体的な WebSocket サーバーのコードはハマりどころ2で貼ったような感じになるのだが、Cloudflare 用の設定ファイルにも以下を追加する必要がある。
{ "$schema": "./node_modules/wrangler/config-schema.json", "name": "websocket-server", "main": "src/index.ts", "durable_objects": { "bindings": [ { "name": "WEBSOCKET_SERVER", "class_name": "WebSocketServer" } ] }, "migrations": [ { "tag": "v1", "new_sqlite_classes": [ "WebSocketServer" ] } ] }
durable_objects.bindings.name に WEBSOCKET_SERVER と設定することで、 Workers コード内で env.WEBSOCKET_SERVER で Durable Object を使えるようになる。
migrations.new_sqlite_classes でも DurableObject を継承したクラス名を書いているが、多分 Workers は内部的に Durable Objects 用の SQLite DB を持っているのだろう。 Storage API というものもあって、 Durable Object 内で使える SQLite 用の API もある。

だが今回は使う予定は無い。
実際のシグナリングサーバーのコード
試行錯誤しながら作ったのであまりよくないコードだが、同じことをやろうと思っている人の参考のために貼る。
import { createRequestHandler } from "react-router"; import { DurableObject } from "cloudflare:workers"; export interface Env { ASSETS: Fetcher; WEBSOCKET_HIBERNATION_SERVER: DurableObjectNamespace<WebSocketHibernationServer>; CLOUDFLARE_TURN_REQUEST_URL: string; CLOUDFLARE_TURN_API_TOKEN: string; } const coopCoepHeaders = { "Cross-Origin-Opener-Policy": "same-origin", "Cross-Origin-Embedder-Policy": "require-corp", "Cross-Origin-Resource-Policy": "same-origin", } as const; function withCoopCoep(response: Response): Response { const headers = new Headers(response.headers); for (const [key, value] of Object.entries(coopCoepHeaders)) { if (!headers.has(key)) { headers.set(key, value); } } return new Response(response.body, { status: response.status, statusText: response.statusText, headers, }); } async function maybeServeAsset( request: Request, env: Env ): Promise<Response | null> { if (request.method !== "GET" && request.method !== "HEAD") return null; if (!env.ASSETS?.fetch) return null; try { const assetResponse = await env.ASSETS.fetch(request); if (assetResponse.status === 404) return null; return withCoopCoep(assetResponse); } catch (error) { console.warn("Asset fetch failed", error); return null; } } export class WebSocketHibernationServer extends DurableObject { sessions: Map<WebSocket, { id: string }> = new Map(); socketsById: Map<string, WebSocket> = new Map(); constructor(ctx: DurableObjectState, env: Cloudflare.Env) { super(ctx, env); this.sessions = new Map(); this.socketsById = new Map(); } private broadcastPeers() { const connected = Array.from(this.socketsById.keys()); this.sessions.forEach((_, sessionSocket) => { sessionSocket.send( JSON.stringify({ type: "peers", connected, }) ); }); } async fetch(request: Request) { const upgradeHeader = request.headers.get("upgrade"); if (!upgradeHeader || upgradeHeader !== "websocket") { return new Response("Expected Upgrade: websocket", { status: 426 }); } const webSocketPair = new WebSocketPair(); const [client, server] = Object.values(webSocketPair); const id = crypto.randomUUID(); this.sessions.set(server, { id }); this.socketsById.set(id, server); this.ctx.acceptWebSocket(server); // Tell the newly connected client their ID and currently connected peers server.send( JSON.stringify({ type: "init", id, connected: Array.from(this.socketsById.keys()).filter((peerId) => peerId !== id), }) ); // Notify existing peers about the new participant so they can connect in real time. this.sessions.forEach((session, sessionSocket) => { if (sessionSocket === server) return; sessionSocket.send( JSON.stringify({ type: "peer-joined", id, }) ); }); this.broadcastPeers(); return new Response(null, { status: 101, webSocket: client, }); } async webSocketMessage(ws: WebSocket, message: string) { console.log("Server received:", message); const data = JSON.parse(message as string); const session = this.sessions.get(ws); if (!session) { ws.send(JSON.stringify({ type: "error", reason: "unknown session" })); return; } // Always stamp sender id data.from = session.id; if (data.type === "ping") { console.log("Sending pong"); this.sessions.forEach((_, sessionSocket) => { if (sessionSocket !== ws) { sessionSocket.send(JSON.stringify({ type: "another devices pong" })); } else { sessionSocket.send(JSON.stringify({ type: "pong" })); } }); return; } if (data.type === "get-peers") { console.log("Sending peer list"); this.broadcastPeers(); return; } if (data.type === "offer" || data.type === "answer" || data.type === "ice") { const targetId = data.to; if (!targetId || typeof targetId !== "string") { ws.send(JSON.stringify({ type: "error", reason: "missing target" })); return; } const targetSocket = this.socketsById.get(targetId); if (!targetSocket) { ws.send(JSON.stringify({ type: "error", reason: "target not connected", to: targetId })); return; } targetSocket.send(JSON.stringify(data)); return; } ws.send(JSON.stringify({ type: "error", reason: "unknown message type" })); } async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean) { console.log("Server WebSocket closed:", code, reason, wasClean); const session = this.sessions.get(ws); if (session) { this.socketsById.delete(session.id); } this.sessions.delete(ws); this.broadcastPeers(); ws.close(code, "Durable Object is closing WebSocket"); } } declare module "react-router" { export interface AppLoadContext { cloudflare: { env: Env; ctx: ExecutionContext; }; } } const requestHandler = createRequestHandler( () => import("virtual:react-router/server-build"), import.meta.env.MODE ); async function getTurnCredentials(env: Env) { if (!env.CLOUDFLARE_TURN_REQUEST_URL || !env.CLOUDFLARE_TURN_API_TOKEN) { return new Response( JSON.stringify({ error: "Missing TURN configuration: CLOUDFLARE_TURN_REQUEST_URL or CLOUDFLARE_TURN_API_TOKEN is not set.", }), { status: 500, headers: { "Content-Type": "application/json", ...coopCoepHeaders, }, } ); } const response = await fetch(env.CLOUDFLARE_TURN_REQUEST_URL, { method: "POST", headers: { "Authorization": `Bearer ${env.CLOUDFLARE_TURN_API_TOKEN}`, "Content-Type": "application/json", }, body: JSON.stringify({ ttl: 86400, }), }); return response.json(); } export default { async fetch(request, env, ctx) { if (request.url.includes("/signaling")) { const stub = env.WEBSOCKET_HIBERNATION_SERVER.getByName("foo") return await stub.fetch(request); } if (request.url.includes("/turn")) { const credentials = await getTurnCredentials(env); return new Response(JSON.stringify(credentials), { headers: { "Content-Type": "application/json", }, }); } const assetResponse = await maybeServeAsset(request, env); if (assetResponse) return assetResponse; const response = await requestHandler(request, { cloudflare: { env, ctx }, }); return withCoopCoep(response); }, } satisfies ExportedHandler<Env>;
ちょっと長くなったので続きは次回。