很久没有维护过 yuna 的底层架构了,最近准备接入 MCP 了。这次准备接入 Google Calendar MCP Server 来替代之前的 function call 形式的日历调用。

什么是 MCP?
MCP 全名是 Model Context Protocol(模型上下文协议) ,这是 Anthropic 公司在 2024 年 11 月 26 日推出的技术协议。MCP 旨在定义一套通用通信规则,借助这套规则,AI 助手就能连接外部资源,让给出的回应更准确、更好用。换句话说,MCP 允许第三方工具与数据源构建插件,并将其添加到你使用的各类智能助手(如 Claude、ChatGPT、Cursor 等)中。这些基于文本的大模型依靠一些“工具”来实现非文本操作功能。MCP 的出现让用户可以“自带工具”接入这些系统。
在今年的前半年,我的开发计划中其实并没有接入 MCP 的打算,其中最重要的一方面是因为懒我认为 MCP 技术在当时的生态不够丰富,前景尚未可知。并且当时整个工程的复杂度并没有很高,我自己设计的 function call 的注册机制也相对健全,对 MCP 这种新东西并没有太大需求。随着时间推移,我发现情况有所变化,一方面工具的数量发生了质的变化,另一方面上层功能的丰富开始对工具调用产生了越来越高的要求。最终决定重写底层,接入 MCP 服务。对我来说,接入 MCP 服务的好处主要有两个:1. 生态丰富,接口标准。我可以顺畅的从工具中获取上下文信息而不需要在我自设的模板中传递。2. 在模型能力更加强大的当下,我认为有必要为模型拥有更强的自主性,而不需要之前那么多的框架规则环绕。MCP 的接入可以让我的模型变得更加自由。
但是很遗憾,MCP 的中文社区生态看起来并不理想,我并没有搜到多少清晰的 Google Calendar MCP Server 使用 or 配置的一些中文博客。因此在这里记述一下,以便日后查阅避坑。整体项目是按照 google-calendar-mcp 的配置来走的,如果熟悉的话可以直接去按照他的指示进行配置。
配置 google Cloud
首先打开 google Cloud。
点击控制台顶部的“选择项目”按钮,然后点击“新建项目”。填写项目名称、组织和位置等信息,点击“创建”按钮。或者进入一个已经有的项目即可。
启用 Google 日历 API。
进入 API 和服务,点击凭证(Credentials)选项卡

创建 OAuth 凭证 id


在受众窗口下,将自己添加为测试用户。
添加测试用户可能需要几分钟时间。在测试用户生效之前,OAuth 授权将不允许您继续操作。
关于测试模式的说明:当应用程序处于测试模式时,身份验证令牌将在 1 周后过期,需要刷新(请参阅下面的重新身份验证部分)。

配置到工程
这一部分就因人而异了。我是 nodejs 用户,因此我可以直接安装:
git clone https://github.com/nspady/google-calendar-mcp.git
cd google-calendar-mcp
npm install
npm run build然后设定你的文件路径并运行:
export GOOGLE_OAUTH_CREDENTIALS="/path/to/your/gcp-oauth.keys.json"
# 如果你是windows:
$env:GOOGLE_OAUTH_CREDENTIALS="/path/to/your/gcp-oauth.keys.json"
npx @cocal/google-calendar-mcp auth
然后即可认证成功。
参考 MCP 代码
JSON-RPC 初始化(含 protocolVersion/capabilities)、env 变量展开、stdio 子进程启动、超时处理。
import { Logger } from "../../../shared/logger/logger.js";
import { JsonRpcClient } from "./jsonrpc/JsonRpcClient.js";
import { StdioTransport } from "./jsonrpc/JsonLineRpcTransport.js";
import { McpCallToolResult, McpListToolsResult, McpTool } from "./McpTypes.js";
import fs from "node:fs";
import path from "node:path";
function expandEnvVars(value: string): string {
return value.replace(/\$\{([^}]+)\}/g, (_, name) => process.env[name] ?? "");
}
export type McpServerConfig = {
name: string;
required: boolean;
toolPrefix: string;
timeoutMs: number;
transport: "stdio" | "http";
command?: string;
args?: string[];
env?: Record<string, string>;
cwd?: string;
baseUrl?: string;
};
export class McpClient {
private readonly logger: Logger;
readonly server: McpServerConfig;
private rpc: JsonRpcClient | null = null;
private initialized = false;
constructor(opts: { logger: Logger; server: McpServerConfig }) {
this.logger = opts.logger;
this.server = opts.server;
}
async start(): Promise<void> {
if (this.rpc) return;
if (this.server.transport === "http") {
throw new Error("HTTP transport not implemented in MVP (use stdio).");
}
if (!this.server.command) throw new Error(`MCP server ${this.server.name} missing command`);
// Preflight: if command is "node" and args[0] looks like a script path, resolve and verify.
const args = [...(this.server.args ?? [])];
if (this.server.command === "node" && args.length > 0 && typeof args[0] === "string") {
const candidate = args[0];
const resolved = path.isAbsolute(candidate)
? candidate
: path.resolve(this.server.cwd ?? process.cwd(), candidate);
if (!fs.existsSync(resolved)) {
throw new Error(
`MCP server ${this.server.name} script not found: ${candidate} (resolved: ${resolved}). ` +
"Please set the correct path in mcp.servers[].args[0].",
);
}
args[0] = resolved;
}
// Merge env with process.env, with ${VAR} expansion for convenience.
const expandedEnv: Record<string, string> = {};
for (const [k, v] of Object.entries(this.server.env ?? {})) {
if (typeof v === "string") {
const expanded = expandEnvVars(v);
if (expanded.length === 0) {
this.logger.warn({ key: k }, "mcp env variable expansion produced empty string; check environment");
}
expandedEnv[k] = expanded;
}
}
const transport = new StdioTransport({
logger: this.logger,
command: this.server.command,
args,
cwd: this.server.cwd,
env: { ...process.env, ...expandedEnv },
});
this.rpc = new JsonRpcClient({ logger: this.logger, transport });
await this.rpc.start();
}
async stop(): Promise<void> {
await this.rpc?.stop();
this.rpc = null;
this.initialized = false;
}
async initialize(): Promise<void> {
await this.start();
if (!this.rpc) throw new Error("rpc not started");
if (this.initialized) return;
// MCP initialization: include protocolVersion + capabilities for servers that validate schema strictly.
await this.rpc.request(
"initialize",
{
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "yuna", version: "0.1.0" },
},
this.server.timeoutMs,
);
// Some servers expect "initialized" notification; send as request for simplicity if supported.
try {
await this.rpc.request("initialized", {}, this.server.timeoutMs);
} catch {
// ignore if not supported
}
this.initialized = true;
}
async listTools(): Promise<McpTool[]> {
await this.initialize();
if (!this.rpc) throw new Error("rpc not started");
const res = (await this.rpc.request<McpListToolsResult>("tools/list", {}, this.server.timeoutMs)) as any;
const tools = Array.isArray(res?.tools) ? (res.tools as McpTool[]) : [];
return tools;
}
async callTool(name: string, args: unknown): Promise<McpCallToolResult> {
await this.initialize();
if (!this.rpc) throw new Error("rpc not started");
const res = await this.rpc.request<McpCallToolResult>("tools/call", { name, arguments: args }, this.server.timeoutMs);
return res as any;
}
}
JSON-RPC 2.0 基础(请求、pending map、超时、错误兜底)。
import { Logger } from "../../../../shared/logger/logger.js";
import { JsonLineRpcTransport } from "./JsonLineRpcTransport.js";
import { JsonRpcRequest, JsonRpcResponse } from "./JsonRpcTypes.js";
export class JsonRpcClient {
private readonly logger: Logger;
private readonly transport: JsonLineRpcTransport;
private nextId = 1;
private pending = new Map<number, { resolve: (v: any) => void; reject: (e: any) => void; timer?: any }>();
constructor(opts: { logger: Logger; transport: JsonLineRpcTransport }) {
this.logger = opts.logger;
this.transport = opts.transport;
}
async start(): Promise<void> {
await this.transport.start((line) => this.onLine(line));
}
async stop(): Promise<void> {
for (const [, p] of this.pending) {
if (p.timer) clearTimeout(p.timer);
p.reject(new Error("transport stopped"));
}
this.pending.clear();
await this.transport.stop();
}
async request<T = unknown>(method: string, params: unknown, timeoutMs: number): Promise<T> {
const id = this.nextId++;
const req: JsonRpcRequest = { jsonrpc: "2.0", id, method, params };
const payload = JSON.stringify(req);
const promise = new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`JSON-RPC timeout: ${method}`));
}, timeoutMs);
this.pending.set(id, { resolve, reject, timer });
});
await this.transport.sendLine(payload);
return await promise;
}
private onLine(line: string) {
const s = line.trim();
if (!s) return;
let msg: any;
try {
msg = JSON.parse(s);
} catch (err) {
this.logger.warn({ line: s }, "mcp received non-json line");
return;
}
// notifications: ignore for MVP (but log for traceability)
if (msg && msg.jsonrpc === "2.0" && msg.method && msg.id === undefined) {
this.logger.debug({ method: msg.method }, "mcp notification received");
return;
}
const resp = msg as JsonRpcResponse;
const id = (resp as any)?.id;
if (typeof id !== "number") return;
const pending = this.pending.get(id);
if (!pending) return;
this.pending.delete(id);
if (pending.timer) clearTimeout(pending.timer);
if ((resp as any).error) {
const e = (resp as any).error;
const err = new Error(`JSON-RPC error ${e.code}: ${e.message}`);
(err as any).data = e.data;
pending.reject(err);
return;
}
pending.resolve((resp as any).result);
}
}
stdio 行级传输
import { ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import readline from "node:readline";
import { Logger } from "../../../../shared/logger/logger.js";
export type TransportMessageHandler = (line: string) => void;
export interface JsonLineRpcTransport {
start(onLine: TransportMessageHandler): Promise<void>;
sendLine(line: string): Promise<void>;
stop(): Promise<void>;
}
export class StdioTransport implements JsonLineRpcTransport {
private readonly logger: Logger;
private readonly command: string;
private readonly args: string[];
private readonly cwd?: string;
private readonly env?: Record<string, string>;
private proc: ChildProcessWithoutNullStreams | null = null;
private rl: readline.Interface | null = null;
constructor(opts: { logger: Logger; command: string; args?: string[]; cwd?: string; env?: Record<string, string> }) {
this.logger = opts.logger;
this.command = opts.command;
this.args = opts.args ?? [];
this.cwd = opts.cwd;
this.env = opts.env;
}
async start(onLine: TransportMessageHandler): Promise<void> {
if (this.proc) return;
this.proc = spawn(this.command, this.args, {
cwd: this.cwd,
env: { ...process.env, ...(this.env ?? {}) },
stdio: ["pipe", "pipe", "pipe"],
});
this.proc.on("exit", (code, signal) => {
this.logger.warn({ code, signal }, "mcp stdio process exited");
});
this.proc.stderr.on("data", (buf) => {
const s = buf.toString("utf-8").trim();
if (s) this.logger.warn({ stderr: s }, "mcp stdio stderr");
});
this.rl = readline.createInterface({ input: this.proc.stdout });
this.rl.on("line", (line) => onLine(line));
}
async sendLine(line: string): Promise<void> {
if (!this.proc) throw new Error("transport not started");
this.proc.stdin.write(line + "\n");
}
async stop(): Promise<void> {
try {
this.rl?.close();
} catch {
// ignore
}
this.rl = null;
const p = this.proc;
this.proc = null;
if (!p) return;
p.kill();
}
}