From 250420e9e24849bdcd2a0021d741404dbce23713 Mon Sep 17 00:00:00 2001 From: "Claude (Lupul Augmentat)" Date: Tue, 27 Jan 2026 07:27:29 +0100 Subject: [PATCH] feat: add SSE server with agent presence system - Add SSE transport server for remote MCP connections - Implement API key authentication - Add agent presence system (register_agent, list_agents) - Add list_services tool to discover local services - Refactor messaging for dynamic agent names Co-Authored-By: Claude Opus 4.5 --- package-lock.json | 22 +++ package.json | 1 + src/middleware/auth.ts | 4 +- src/sse-server.ts | 212 ++++++++++++++++++++ src/tools/builtin/messaging.ts | 340 ++++++++++++++++++++++++++++----- src/tools/index.ts | 7 +- 6 files changed, 529 insertions(+), 57 deletions(-) create mode 100644 src/sse-server.ts diff --git a/package-lock.json b/package-lock.json index 34fefd4..28818b2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@types/ws": "^8.18.1", "cors": "^2.8.5", "dotenv": "^16.4.5", + "eventsource": "^4.1.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", "nats": "^2.19.0", @@ -3228,6 +3229,27 @@ "node": ">=0.8.x" } }, + "node_modules/eventsource": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-4.1.0.tgz", + "integrity": "sha512-2GuF51iuHX6A9xdTccMTsNb7VO0lHZihApxhvQzJB5A03DvHDd2FQepodbMaztPBmBcE/ox7o2gqaxGhYB9LhQ==", + "license": "MIT", + "dependencies": { + "eventsource-parser": "^3.0.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/eventsource-parser": { + "version": "3.0.6", + "resolved": "https://registry.npmjs.org/eventsource-parser/-/eventsource-parser-3.0.6.tgz", + "integrity": "sha512-Vo1ab+QXPzZ4tCa8SwIHJFaSzy4R6SHf7BY79rFBDf0idraZWAkYrDjDj8uWaSm3S2TK+hJ7/t1CEmZ7jXw+pg==", + "license": "MIT", + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", diff --git a/package.json b/package.json index 583e6c0..283ab92 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "@types/ws": "^8.18.1", "cors": "^2.8.5", "dotenv": "^16.4.5", + "eventsource": "^4.1.0", "express": "^4.18.2", "jsonwebtoken": "^9.0.2", "nats": "^2.19.0", diff --git a/src/middleware/auth.ts b/src/middleware/auth.ts index 411100c..0d8658e 100644 --- a/src/middleware/auth.ts +++ b/src/middleware/auth.ts @@ -57,8 +57,8 @@ export function authMiddleware(req: AuthRequest, res: Response, next: NextFuncti const token = parts[1]; try { - const secret = getJwtSecret(); - const decoded = jwt.verify(token, secret as Secret) as any; + const secret: string = getJwtSecret(); + const decoded = jwt.verify(token, secret) as any; if (typeof decoded === 'object' && decoded !== null && 'id' in decoded) { req.user = { diff --git a/src/sse-server.ts b/src/sse-server.ts new file mode 100644 index 0000000..e280eca --- /dev/null +++ b/src/sse-server.ts @@ -0,0 +1,212 @@ +import express, { Request, Response, NextFunction } from 'express'; +import cors from 'cors'; +import { Server } from '@modelcontextprotocol/sdk/server/index.js'; +import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { z } from 'zod'; +import { config } from './config'; +import { createLogger } from './utils/logger'; +import { ToolRegistry } from './registry/ToolRegistry'; +import { NatsClient } from './nats/NatsClient'; + +const logger = createLogger('SSEServer'); + +// Store active transports by session ID +const transports = new Map(); + +// API Key from environment +const API_KEY = process.env.API_KEY; + +// Auth middleware +function authMiddleware(req: Request, res: Response, next: NextFunction): void { + // Skip auth if no API_KEY configured + if (!API_KEY) { + return next(); + } + + // Check Authorization header + const authHeader = req.headers.authorization; + if (authHeader) { + const [type, key] = authHeader.split(' '); + if (type === 'Bearer' && key === API_KEY) { + return next(); + } + } + + // Check X-API-Key header + const apiKeyHeader = req.headers['x-api-key']; + if (apiKeyHeader === API_KEY) { + return next(); + } + + // Check query parameter + const apiKeyQuery = req.query.apiKey; + if (apiKeyQuery === API_KEY) { + return next(); + } + + logger.warn({ ip: req.ip }, 'Unauthorized connection attempt'); + res.status(401).json({ error: 'Unauthorized: Invalid or missing API key' }); +} + +async function startSSEServer() { + const app = express(); + app.use(cors()); + app.use(express.json()); + + // Initialize dependencies + const natsClient = new NatsClient(); + await natsClient.connect(); + logger.info('Connected to NATS'); + + const toolRegistry = new ToolRegistry(natsClient); + await toolRegistry.initialize(); + logger.info('Tool registry initialized'); + + // Health check + app.get('/health', (_req, res) => { + res.send('healthy\n'); + }); + + // OAuth discovery endpoints - return empty/minimal responses + // Claude Code checks these before connecting + app.get('/.well-known/oauth-protected-resource', (_req, res) => { + res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' }); + }); + + app.get('/.well-known/oauth-protected-resource/*', (_req, res) => { + res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' }); + }); + + app.get('/.well-known/oauth-authorization-server', (_req, res) => { + res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' }); + }); + + app.get('/.well-known/openid-configuration', (_req, res) => { + res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' }); + }); + + app.post('/register', (_req, res) => { + res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' }); + }); + + // SSE endpoint - establishes the SSE stream (protected) + app.get('/sse', authMiddleware, async (req, res) => { + logger.info('New SSE connection request'); + + // Create MCP server for this session + const server = new Server( + { + name: 'lupul-augmentat', + version: '0.1.0', + }, + { + capabilities: { + tools: {}, + }, + } + ); + + // Setup tool handlers + const ListToolsSchema = z.object({ + method: z.literal('tools/list'), + }); + + server.setRequestHandler(ListToolsSchema, async () => { + const tools = await toolRegistry.listTools(); + return { + tools: tools.map((tool) => ({ + name: tool.name, + description: tool.description, + inputSchema: tool.inputSchema, + })), + }; + }); + + const CallToolSchema = z.object({ + method: z.literal('tools/call'), + params: z.object({ + name: z.string(), + arguments: z.unknown().optional(), + }), + }); + + server.setRequestHandler(CallToolSchema, async (request) => { + try { + const result = await toolRegistry.executeTool( + request.params.name, + request.params.arguments + ); + return { content: [{ type: 'text', text: JSON.stringify(result) }] }; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + return { + content: [{ type: 'text', text: `Error: ${errorMessage}` }], + isError: true, + }; + } + }); + + // Create SSE transport + const transport = new SSEServerTransport('/message', res); + transports.set(transport.sessionId, transport); + + logger.info({ sessionId: transport.sessionId, activeSessions: transports.size }, 'SSE transport created'); + console.log(`[SSE] New session: ${transport.sessionId}, total active: ${transports.size + 1}`); + + // Clean up on close + transport.onclose = () => { + logger.info({ sessionId: transport.sessionId }, 'SSE connection closed'); + transports.delete(transport.sessionId); + }; + + // Connect server to transport (this also starts the SSE stream) + await server.connect(transport); + }); + + // Message endpoint - receives POST messages from clients (protected) + app.post('/message', authMiddleware, async (req, res) => { + const sessionId = req.query.sessionId as string; + logger.info({ sessionId, activeSessions: Array.from(transports.keys()) }, 'POST /message received'); + + if (!sessionId) { + logger.warn('Missing sessionId'); + res.status(400).json({ error: 'Missing sessionId query parameter' }); + return; + } + + const transport = transports.get(sessionId); + if (!transport) { + logger.warn({ sessionId, activeSessions: Array.from(transports.keys()) }, 'Session not found'); + res.status(404).json({ error: 'Session not found' }); + return; + } + + logger.debug({ sessionId, body: req.body }, 'Received message'); + + await transport.handlePostMessage(req, res); + }); + + const host = config.mcp.host; + const port = config.mcp.port; + + app.listen(port, host, () => { + logger.info({ host, port }, 'SSE Server started'); + console.log(`SSE Server running at http://${host}:${port}`); + console.log(`Connect via: http://${host}:${port}/sse`); + }); + + // Graceful shutdown + process.on('SIGTERM', async () => { + logger.info('Shutting down SSE server'); + for (const transport of transports.values()) { + await transport.close(); + } + await natsClient.disconnect(); + process.exit(0); + }); +} + +startSSEServer().catch((error) => { + logger.error({ error }, 'Failed to start SSE server'); + process.exit(1); +}); diff --git a/src/tools/builtin/messaging.ts b/src/tools/builtin/messaging.ts index cc75314..5c761a5 100644 --- a/src/tools/builtin/messaging.ts +++ b/src/tools/builtin/messaging.ts @@ -1,72 +1,289 @@ import { ToolHandler, ToolContext } from '../base/ToolHandler'; import { NatsClient } from '../../nats/NatsClient'; import { z } from 'zod'; +import * as os from 'os'; +import { execSync } from 'child_process'; -// Detect current developer role from workspace -function detectRole(): string { - const cwd = process.cwd(); - - // Match patterns - check most specific first - if (cwd.includes('joylo-admin') || cwd.includes('joylo/admin')) return 'admin-dev'; - if (cwd.includes('joylo-api') || cwd.includes('joylo/api')) return 'api-dev'; - if (cwd.includes('joylo-web') || cwd.includes('joylo/web')) return 'web-dev'; - - // Fallback regex - if (/admin/i.test(cwd)) return 'admin-dev'; - if (/\bapi\b/i.test(cwd)) return 'api-dev'; - if (/\bweb\b/i.test(cwd)) return 'web-dev'; - - return 'unknown'; +// Agent presence info +interface AgentInfo { + name: string; + role: string; + hostname: string; + capabilities: string[]; + services: string[]; + connectedAt: string; + lastSeen: string; + sessionId: string; } +// Global presence tracking +const onlineAgents = new Map(); +const PRESENCE_TIMEOUT = 120000; // 2 minutes + // Schemas +const RegisterAgentSchema = z.object({ + name: z.string().min(1).max(50), + role: z.string().optional(), + capabilities: z.array(z.string()).optional(), +}); + +const ListAgentsSchema = z.object({}); + +const ListServicesSchema = z.object({}); + const SendMessageSchema = z.object({ - to: z.enum(['admin-dev', 'api-dev', 'web-dev']), - from: z.enum(['admin-dev', 'api-dev', 'web-dev']).optional(), - subject: z.string(), - message: z.string(), - data: z.record(z.any()).optional(), + to: z.string().min(1), + message: z.string().min(1), + subject: z.string().optional(), }); const ReceiveMessagesSchema = z.object({ limit: z.number().optional(), }); +type RegisterAgentInput = z.infer; +type ListAgentsInput = z.infer; +type ListServicesInput = z.infer; type SendMessageInput = z.infer; type ReceiveMessagesInput = z.infer; +// Clean up stale presence entries +function cleanupStalePresence() { + const now = Date.now(); + for (const [sessionId, info] of onlineAgents.entries()) { + if (now - new Date(info.lastSeen).getTime() > PRESENCE_TIMEOUT) { + onlineAgents.delete(sessionId); + } + } +} + +// Detect local services +function detectLocalServices(): string[] { + const services: string[] = []; + + try { + // Get running systemd services + const output = execSync('systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null || true', { encoding: 'utf8' }); + const lines = output.split('\n').filter(l => l.trim()); + + for (const line of lines) { + const match = line.match(/^(\S+\.service)/); + if (match) { + const serviceName = match[1].replace('.service', ''); + // Filter interesting services + if (!serviceName.startsWith('sys-') && + !serviceName.startsWith('user@') && + !serviceName.includes('getty') && + !serviceName.includes('systemd-')) { + services.push(serviceName); + } + } + } + } catch (e) { + // Ignore errors + } + + // Also detect by checking common ports/processes + try { + const ports = execSync('ss -tlnp 2>/dev/null | grep LISTEN || true', { encoding: 'utf8' }); + if (ports.includes(':5432')) services.push('postgresql'); + if (ports.includes(':3306')) services.push('mysql'); + if (ports.includes(':27017')) services.push('mongodb'); + if (ports.includes(':6379')) services.push('redis'); + if (ports.includes(':80') || ports.includes(':443')) services.push('nginx'); + if (ports.includes(':4222')) services.push('nats'); + } catch (e) { + // Ignore + } + + return [...new Set(services)].sort(); +} + +export class RegisterAgentTool extends ToolHandler { + constructor(private natsClient: NatsClient) { + super( + { + name: 'register_agent', + description: 'Register this agent with a name and role. Makes you visible to other agents.', + }, + RegisterAgentSchema, + ); + + this.setupPresenceListener(); + } + + private setupPresenceListener() { + try { + this.natsClient.subscribe('agents.announce', (msg: any) => { + if (msg.name && msg.sessionId) { + onlineAgents.set(msg.sessionId, { + name: msg.name, + role: msg.role || 'assistant', + hostname: msg.hostname || 'unknown', + capabilities: msg.capabilities || [], + services: msg.services || [], + connectedAt: msg.connectedAt || new Date().toISOString(), + lastSeen: new Date().toISOString(), + sessionId: msg.sessionId, + }); + } + }); + + this.natsClient.subscribe('agents.heartbeat', (msg: any) => { + if (msg.sessionId && onlineAgents.has(msg.sessionId)) { + const info = onlineAgents.get(msg.sessionId)!; + info.lastSeen = new Date().toISOString(); + } + }); + + this.natsClient.subscribe('agents.leave', (msg: any) => { + if (msg.sessionId) { + onlineAgents.delete(msg.sessionId); + } + }); + } catch (error) { + // NATS not connected yet + } + } + + protected async handle(input: RegisterAgentInput, context: ToolContext): Promise { + cleanupStalePresence(); + + const sessionId = context.requestId || `session-${Date.now()}`; + const services = detectLocalServices(); + + const agentInfo: AgentInfo = { + name: input.name, + role: input.role || 'assistant', + hostname: os.hostname(), + capabilities: input.capabilities || ['chat', 'file_operations', 'system_commands'], + services, + connectedAt: new Date().toISOString(), + lastSeen: new Date().toISOString(), + sessionId, + }; + + onlineAgents.set(sessionId, agentInfo); + await this.natsClient.publish('agents.announce', agentInfo); + + return { + success: true, + message: `Registered as "${input.name}" (${input.role || 'assistant'})`, + agent: agentInfo, + }; + } +} + +export class ListAgentsTool extends ToolHandler { + constructor(private natsClient: NatsClient) { + super( + { + name: 'list_agents', + description: 'List all agents currently online', + }, + ListAgentsSchema, + ); + } + + protected async handle(_input: ListAgentsInput, _context: ToolContext): Promise { + cleanupStalePresence(); + + const agents = Array.from(onlineAgents.values()).map(a => ({ + name: a.name, + role: a.role, + hostname: a.hostname, + capabilities: a.capabilities, + services: a.services, + connectedAt: a.connectedAt, + lastSeen: a.lastSeen, + })); + + return { + success: true, + count: agents.length, + agents, + message: agents.length === 0 + ? 'No agents online. Use register_agent to go online.' + : `${agents.length} agent(s) online`, + }; + } +} + +export class ListServicesTool extends ToolHandler { + constructor(private natsClient: NatsClient) { + super( + { + name: 'list_services', + description: 'List services running on this machine', + }, + ListServicesSchema, + ); + } + + protected async handle(_input: ListServicesInput, _context: ToolContext): Promise { + const services = detectLocalServices(); + + // Get more details for each service + const serviceDetails: any[] = []; + for (const svc of services.slice(0, 30)) { // Limit to 30 + try { + const status = execSync(`systemctl is-active ${svc} 2>/dev/null || echo unknown`, { encoding: 'utf8' }).trim(); + serviceDetails.push({ + name: svc, + status, + }); + } catch { + serviceDetails.push({ name: svc, status: 'unknown' }); + } + } + + return { + success: true, + hostname: os.hostname(), + count: serviceDetails.length, + services: serviceDetails, + }; + } +} + export class SendMessageTool extends ToolHandler { constructor(private natsClient: NatsClient) { super( { name: 'send_message', - description: 'Send real-time message to another developer via NATS', + description: 'Send a message to another agent by name', }, SendMessageSchema, ); } - protected async handle(input: SendMessageInput, _context: ToolContext): Promise { - const { to, from: explicitFrom, subject, message, data } = input; - const from = explicitFrom || detectRole(); + protected async handle(input: SendMessageInput, context: ToolContext): Promise { + const { to, message, subject } = input; + + // Find sender info + let fromName = 'anonymous'; + for (const [sid, info] of onlineAgents.entries()) { + if (sid === context.requestId || sid.includes(context.requestId)) { + fromName = info.name; + break; + } + } const payload = { - from, + from: fromName, to, - subject, + subject: subject || 'Message', message, - data, timestamp: new Date().toISOString(), }; - // Publish to NATS channel for recipient - const channel = `dev.messages.${to}`; - await this.natsClient.publish(channel, payload); + // Send to agent's channel + await this.natsClient.publish(`messages.${to}`, payload); + await this.natsClient.publish('messages.broadcast', payload); return { success: true, - message: `Message sent from ${from} to ${to}`, - channel, + message: `Message sent to ${to}`, payload, }; } @@ -75,54 +292,71 @@ export class SendMessageTool extends ToolHandler { export class ReceiveMessagesTool extends ToolHandler { private messages: any[] = []; private subscribed = false; + private myName: string = ''; constructor(private natsClient: NatsClient) { super( { name: 'receive_messages', - description: 'Receive real-time messages from other developers via NATS', + description: 'Receive messages. First register_agent with a name.', }, ReceiveMessagesSchema, ); } - private setupSubscription() { - if (this.subscribed) return; + private setupSubscription(name: string) { + if (this.subscribed && this.myName === name) return; - const myRole = detectRole(); - const channel = `dev.messages.${myRole}`; - - // Subscribe to messages for this role (only if NATS is connected) try { - this.natsClient.subscribe(channel, (msg) => { + this.natsClient.subscribe(`messages.${name}`, (msg) => { this.messages.push(msg); - - // Send notification to Claude Code UI - this.notifyNewMessage(msg); + console.log(`\n📬 MESSAGE from ${msg.from}: ${msg.subject}\n ${msg.message.substring(0, 200)}\n`); }); + + this.natsClient.subscribe('messages.broadcast', (msg) => { + if (msg.to === name || msg.to === 'all') { + this.messages.push(msg); + } + }); + this.subscribed = true; + this.myName = name; } catch (error) { - // NATS not connected yet, will be called later + // NATS not connected } } - private notifyNewMessage(msg: any) { - const preview = msg.message?.substring(0, 100) || ''; - console.log(`\n📬 NEW MESSAGE from ${msg.from}\n Subject: ${msg.subject}\n Preview: ${preview}...\n`); - } + protected async handle(input: ReceiveMessagesInput, context: ToolContext): Promise { + let myName = ''; + for (const [sid, info] of onlineAgents.entries()) { + if (sid === context.requestId || sid.includes(context.requestId)) { + myName = info.name; + break; + } + } - protected async handle(input: ReceiveMessagesInput, _context: ToolContext): Promise { - // Setup subscription on first use (after NATS is connected) - this.setupSubscription(); + if (!myName) { + return { + success: false, + message: 'Please register_agent first', + messages: [], + }; + } - const limit = input.limit || 10; + this.setupSubscription(myName); + + const limit = input.limit || 20; const recentMessages = this.messages.slice(-limit); return { success: true, - role: detectRole(), + name: myName, count: recentMessages.length, messages: recentMessages, }; } } + +// Keep old exports for compatibility +export { RegisterAgentTool as RegisterPresenceTool }; +export { ListAgentsTool as ListOnlineTool }; diff --git a/src/tools/index.ts b/src/tools/index.ts index b35ab7f..6ef9f5e 100644 --- a/src/tools/index.ts +++ b/src/tools/index.ts @@ -4,7 +4,7 @@ import { FileWriteTool } from './builtin/FileWriteTool'; import { FileListTool } from './builtin/FileListTool'; import { SystemCommandTool } from './builtin/SystemCommandTool'; import { HttpRequestTool } from './builtin/HttpRequestTool'; -import { SendMessageTool, ReceiveMessagesTool } from './builtin/messaging'; +import { RegisterAgentTool, ListAgentsTool, ListServicesTool, SendMessageTool, ReceiveMessagesTool } from './builtin/messaging'; import { NatsClient } from '../nats/NatsClient'; export * from './base/ToolHandler'; @@ -21,7 +21,10 @@ export function createBuiltinTools(natsClient: NatsClient): Map