feat: add SSE server with agent presence system

- Add SSE transport server for remote MCP connections
- Implement API key authentication
- Add agent presence system (register_agent, list_agents)
- Add list_services tool to discover local services
- Refactor messaging for dynamic agent names

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude (Lupul Augmentat)
2026-01-27 07:27:29 +01:00
parent 24e871ea01
commit 250420e9e2
6 changed files with 529 additions and 57 deletions

22
package-lock.json generated
View File

@@ -13,6 +13,7 @@
"@types/ws": "^8.18.1",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"eventsource": "^4.1.0",
"express": "^4.18.2",
"jsonwebtoken": "^9.0.2",
"nats": "^2.19.0",
@@ -3228,6 +3229,27 @@
"node": ">=0.8.x"
}
},
"node_modules/eventsource": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/eventsource/-/eventsource-4.1.0.tgz",
"integrity": "sha512-2GuF51iuHX6A9xdTccMTsNb7VO0lHZihApxhvQzJB5A03DvHDd2FQepodbMaztPBmBcE/ox7o2gqaxGhYB9LhQ==",
"license": "MIT",
"dependencies": {
"eventsource-parser": "^3.0.1"
},
"engines": {
"node": ">=20.0.0"
}
},
"node_modules/eventsource-parser": {
"version": "3.0.6",
"resolved": "https://registry.npmjs.org/eventsource-parser/-/eventsource-parser-3.0.6.tgz",
"integrity": "sha512-Vo1ab+QXPzZ4tCa8SwIHJFaSzy4R6SHf7BY79rFBDf0idraZWAkYrDjDj8uWaSm3S2TK+hJ7/t1CEmZ7jXw+pg==",
"license": "MIT",
"engines": {
"node": ">=18.0.0"
}
},
"node_modules/execa": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz",

View File

@@ -29,6 +29,7 @@
"@types/ws": "^8.18.1",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"eventsource": "^4.1.0",
"express": "^4.18.2",
"jsonwebtoken": "^9.0.2",
"nats": "^2.19.0",

View File

@@ -57,8 +57,8 @@ export function authMiddleware(req: AuthRequest, res: Response, next: NextFuncti
const token = parts[1];
try {
const secret = getJwtSecret();
const decoded = jwt.verify(token, secret as Secret) as any;
const secret: string = getJwtSecret();
const decoded = jwt.verify(token, secret) as any;
if (typeof decoded === 'object' && decoded !== null && 'id' in decoded) {
req.user = {

212
src/sse-server.ts Normal file
View File

@@ -0,0 +1,212 @@
import express, { Request, Response, NextFunction } from 'express';
import cors from 'cors';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { z } from 'zod';
import { config } from './config';
import { createLogger } from './utils/logger';
import { ToolRegistry } from './registry/ToolRegistry';
import { NatsClient } from './nats/NatsClient';
const logger = createLogger('SSEServer');
// Store active transports by session ID
const transports = new Map<string, SSEServerTransport>();
// API Key from environment
const API_KEY = process.env.API_KEY;
// Auth middleware
function authMiddleware(req: Request, res: Response, next: NextFunction): void {
// Skip auth if no API_KEY configured
if (!API_KEY) {
return next();
}
// Check Authorization header
const authHeader = req.headers.authorization;
if (authHeader) {
const [type, key] = authHeader.split(' ');
if (type === 'Bearer' && key === API_KEY) {
return next();
}
}
// Check X-API-Key header
const apiKeyHeader = req.headers['x-api-key'];
if (apiKeyHeader === API_KEY) {
return next();
}
// Check query parameter
const apiKeyQuery = req.query.apiKey;
if (apiKeyQuery === API_KEY) {
return next();
}
logger.warn({ ip: req.ip }, 'Unauthorized connection attempt');
res.status(401).json({ error: 'Unauthorized: Invalid or missing API key' });
}
async function startSSEServer() {
const app = express();
app.use(cors());
app.use(express.json());
// Initialize dependencies
const natsClient = new NatsClient();
await natsClient.connect();
logger.info('Connected to NATS');
const toolRegistry = new ToolRegistry(natsClient);
await toolRegistry.initialize();
logger.info('Tool registry initialized');
// Health check
app.get('/health', (_req, res) => {
res.send('healthy\n');
});
// OAuth discovery endpoints - return empty/minimal responses
// Claude Code checks these before connecting
app.get('/.well-known/oauth-protected-resource', (_req, res) => {
res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' });
});
app.get('/.well-known/oauth-protected-resource/*', (_req, res) => {
res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' });
});
app.get('/.well-known/oauth-authorization-server', (_req, res) => {
res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' });
});
app.get('/.well-known/openid-configuration', (_req, res) => {
res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' });
});
app.post('/register', (_req, res) => {
res.status(404).json({ error: 'not_found', error_description: 'OAuth not supported' });
});
// SSE endpoint - establishes the SSE stream (protected)
app.get('/sse', authMiddleware, async (req, res) => {
logger.info('New SSE connection request');
// Create MCP server for this session
const server = new Server(
{
name: 'lupul-augmentat',
version: '0.1.0',
},
{
capabilities: {
tools: {},
},
}
);
// Setup tool handlers
const ListToolsSchema = z.object({
method: z.literal('tools/list'),
});
server.setRequestHandler(ListToolsSchema, async () => {
const tools = await toolRegistry.listTools();
return {
tools: tools.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
})),
};
});
const CallToolSchema = z.object({
method: z.literal('tools/call'),
params: z.object({
name: z.string(),
arguments: z.unknown().optional(),
}),
});
server.setRequestHandler(CallToolSchema, async (request) => {
try {
const result = await toolRegistry.executeTool(
request.params.name,
request.params.arguments
);
return { content: [{ type: 'text', text: JSON.stringify(result) }] };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
return {
content: [{ type: 'text', text: `Error: ${errorMessage}` }],
isError: true,
};
}
});
// Create SSE transport
const transport = new SSEServerTransport('/message', res);
transports.set(transport.sessionId, transport);
logger.info({ sessionId: transport.sessionId, activeSessions: transports.size }, 'SSE transport created');
console.log(`[SSE] New session: ${transport.sessionId}, total active: ${transports.size + 1}`);
// Clean up on close
transport.onclose = () => {
logger.info({ sessionId: transport.sessionId }, 'SSE connection closed');
transports.delete(transport.sessionId);
};
// Connect server to transport (this also starts the SSE stream)
await server.connect(transport);
});
// Message endpoint - receives POST messages from clients (protected)
app.post('/message', authMiddleware, async (req, res) => {
const sessionId = req.query.sessionId as string;
logger.info({ sessionId, activeSessions: Array.from(transports.keys()) }, 'POST /message received');
if (!sessionId) {
logger.warn('Missing sessionId');
res.status(400).json({ error: 'Missing sessionId query parameter' });
return;
}
const transport = transports.get(sessionId);
if (!transport) {
logger.warn({ sessionId, activeSessions: Array.from(transports.keys()) }, 'Session not found');
res.status(404).json({ error: 'Session not found' });
return;
}
logger.debug({ sessionId, body: req.body }, 'Received message');
await transport.handlePostMessage(req, res);
});
const host = config.mcp.host;
const port = config.mcp.port;
app.listen(port, host, () => {
logger.info({ host, port }, 'SSE Server started');
console.log(`SSE Server running at http://${host}:${port}`);
console.log(`Connect via: http://${host}:${port}/sse`);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('Shutting down SSE server');
for (const transport of transports.values()) {
await transport.close();
}
await natsClient.disconnect();
process.exit(0);
});
}
startSSEServer().catch((error) => {
logger.error({ error }, 'Failed to start SSE server');
process.exit(1);
});

View File

@@ -1,72 +1,289 @@
import { ToolHandler, ToolContext } from '../base/ToolHandler';
import { NatsClient } from '../../nats/NatsClient';
import { z } from 'zod';
import * as os from 'os';
import { execSync } from 'child_process';
// Detect current developer role from workspace
function detectRole(): string {
const cwd = process.cwd();
// Match patterns - check most specific first
if (cwd.includes('joylo-admin') || cwd.includes('joylo/admin')) return 'admin-dev';
if (cwd.includes('joylo-api') || cwd.includes('joylo/api')) return 'api-dev';
if (cwd.includes('joylo-web') || cwd.includes('joylo/web')) return 'web-dev';
// Fallback regex
if (/admin/i.test(cwd)) return 'admin-dev';
if (/\bapi\b/i.test(cwd)) return 'api-dev';
if (/\bweb\b/i.test(cwd)) return 'web-dev';
return 'unknown';
// Agent presence info
interface AgentInfo {
name: string;
role: string;
hostname: string;
capabilities: string[];
services: string[];
connectedAt: string;
lastSeen: string;
sessionId: string;
}
// Global presence tracking
const onlineAgents = new Map<string, AgentInfo>();
const PRESENCE_TIMEOUT = 120000; // 2 minutes
// Schemas
const RegisterAgentSchema = z.object({
name: z.string().min(1).max(50),
role: z.string().optional(),
capabilities: z.array(z.string()).optional(),
});
const ListAgentsSchema = z.object({});
const ListServicesSchema = z.object({});
const SendMessageSchema = z.object({
to: z.enum(['admin-dev', 'api-dev', 'web-dev']),
from: z.enum(['admin-dev', 'api-dev', 'web-dev']).optional(),
subject: z.string(),
message: z.string(),
data: z.record(z.any()).optional(),
to: z.string().min(1),
message: z.string().min(1),
subject: z.string().optional(),
});
const ReceiveMessagesSchema = z.object({
limit: z.number().optional(),
});
type RegisterAgentInput = z.infer<typeof RegisterAgentSchema>;
type ListAgentsInput = z.infer<typeof ListAgentsSchema>;
type ListServicesInput = z.infer<typeof ListServicesSchema>;
type SendMessageInput = z.infer<typeof SendMessageSchema>;
type ReceiveMessagesInput = z.infer<typeof ReceiveMessagesSchema>;
// Clean up stale presence entries
function cleanupStalePresence() {
const now = Date.now();
for (const [sessionId, info] of onlineAgents.entries()) {
if (now - new Date(info.lastSeen).getTime() > PRESENCE_TIMEOUT) {
onlineAgents.delete(sessionId);
}
}
}
// Detect local services
function detectLocalServices(): string[] {
const services: string[] = [];
try {
// Get running systemd services
const output = execSync('systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null || true', { encoding: 'utf8' });
const lines = output.split('\n').filter(l => l.trim());
for (const line of lines) {
const match = line.match(/^(\S+\.service)/);
if (match) {
const serviceName = match[1].replace('.service', '');
// Filter interesting services
if (!serviceName.startsWith('sys-') &&
!serviceName.startsWith('user@') &&
!serviceName.includes('getty') &&
!serviceName.includes('systemd-')) {
services.push(serviceName);
}
}
}
} catch (e) {
// Ignore errors
}
// Also detect by checking common ports/processes
try {
const ports = execSync('ss -tlnp 2>/dev/null | grep LISTEN || true', { encoding: 'utf8' });
if (ports.includes(':5432')) services.push('postgresql');
if (ports.includes(':3306')) services.push('mysql');
if (ports.includes(':27017')) services.push('mongodb');
if (ports.includes(':6379')) services.push('redis');
if (ports.includes(':80') || ports.includes(':443')) services.push('nginx');
if (ports.includes(':4222')) services.push('nats');
} catch (e) {
// Ignore
}
return [...new Set(services)].sort();
}
export class RegisterAgentTool extends ToolHandler<RegisterAgentInput, any> {
constructor(private natsClient: NatsClient) {
super(
{
name: 'register_agent',
description: 'Register this agent with a name and role. Makes you visible to other agents.',
},
RegisterAgentSchema,
);
this.setupPresenceListener();
}
private setupPresenceListener() {
try {
this.natsClient.subscribe('agents.announce', (msg: any) => {
if (msg.name && msg.sessionId) {
onlineAgents.set(msg.sessionId, {
name: msg.name,
role: msg.role || 'assistant',
hostname: msg.hostname || 'unknown',
capabilities: msg.capabilities || [],
services: msg.services || [],
connectedAt: msg.connectedAt || new Date().toISOString(),
lastSeen: new Date().toISOString(),
sessionId: msg.sessionId,
});
}
});
this.natsClient.subscribe('agents.heartbeat', (msg: any) => {
if (msg.sessionId && onlineAgents.has(msg.sessionId)) {
const info = onlineAgents.get(msg.sessionId)!;
info.lastSeen = new Date().toISOString();
}
});
this.natsClient.subscribe('agents.leave', (msg: any) => {
if (msg.sessionId) {
onlineAgents.delete(msg.sessionId);
}
});
} catch (error) {
// NATS not connected yet
}
}
protected async handle(input: RegisterAgentInput, context: ToolContext): Promise<any> {
cleanupStalePresence();
const sessionId = context.requestId || `session-${Date.now()}`;
const services = detectLocalServices();
const agentInfo: AgentInfo = {
name: input.name,
role: input.role || 'assistant',
hostname: os.hostname(),
capabilities: input.capabilities || ['chat', 'file_operations', 'system_commands'],
services,
connectedAt: new Date().toISOString(),
lastSeen: new Date().toISOString(),
sessionId,
};
onlineAgents.set(sessionId, agentInfo);
await this.natsClient.publish('agents.announce', agentInfo);
return {
success: true,
message: `Registered as "${input.name}" (${input.role || 'assistant'})`,
agent: agentInfo,
};
}
}
export class ListAgentsTool extends ToolHandler<ListAgentsInput, any> {
constructor(private natsClient: NatsClient) {
super(
{
name: 'list_agents',
description: 'List all agents currently online',
},
ListAgentsSchema,
);
}
protected async handle(_input: ListAgentsInput, _context: ToolContext): Promise<any> {
cleanupStalePresence();
const agents = Array.from(onlineAgents.values()).map(a => ({
name: a.name,
role: a.role,
hostname: a.hostname,
capabilities: a.capabilities,
services: a.services,
connectedAt: a.connectedAt,
lastSeen: a.lastSeen,
}));
return {
success: true,
count: agents.length,
agents,
message: agents.length === 0
? 'No agents online. Use register_agent to go online.'
: `${agents.length} agent(s) online`,
};
}
}
export class ListServicesTool extends ToolHandler<ListServicesInput, any> {
constructor(private natsClient: NatsClient) {
super(
{
name: 'list_services',
description: 'List services running on this machine',
},
ListServicesSchema,
);
}
protected async handle(_input: ListServicesInput, _context: ToolContext): Promise<any> {
const services = detectLocalServices();
// Get more details for each service
const serviceDetails: any[] = [];
for (const svc of services.slice(0, 30)) { // Limit to 30
try {
const status = execSync(`systemctl is-active ${svc} 2>/dev/null || echo unknown`, { encoding: 'utf8' }).trim();
serviceDetails.push({
name: svc,
status,
});
} catch {
serviceDetails.push({ name: svc, status: 'unknown' });
}
}
return {
success: true,
hostname: os.hostname(),
count: serviceDetails.length,
services: serviceDetails,
};
}
}
export class SendMessageTool extends ToolHandler<SendMessageInput, any> {
constructor(private natsClient: NatsClient) {
super(
{
name: 'send_message',
description: 'Send real-time message to another developer via NATS',
description: 'Send a message to another agent by name',
},
SendMessageSchema,
);
}
protected async handle(input: SendMessageInput, _context: ToolContext): Promise<any> {
const { to, from: explicitFrom, subject, message, data } = input;
const from = explicitFrom || detectRole();
protected async handle(input: SendMessageInput, context: ToolContext): Promise<any> {
const { to, message, subject } = input;
// Find sender info
let fromName = 'anonymous';
for (const [sid, info] of onlineAgents.entries()) {
if (sid === context.requestId || sid.includes(context.requestId)) {
fromName = info.name;
break;
}
}
const payload = {
from,
from: fromName,
to,
subject,
subject: subject || 'Message',
message,
data,
timestamp: new Date().toISOString(),
};
// Publish to NATS channel for recipient
const channel = `dev.messages.${to}`;
await this.natsClient.publish(channel, payload);
// Send to agent's channel
await this.natsClient.publish(`messages.${to}`, payload);
await this.natsClient.publish('messages.broadcast', payload);
return {
success: true,
message: `Message sent from ${from} to ${to}`,
channel,
message: `Message sent to ${to}`,
payload,
};
}
@@ -75,54 +292,71 @@ export class SendMessageTool extends ToolHandler<SendMessageInput, any> {
export class ReceiveMessagesTool extends ToolHandler<ReceiveMessagesInput, any> {
private messages: any[] = [];
private subscribed = false;
private myName: string = '';
constructor(private natsClient: NatsClient) {
super(
{
name: 'receive_messages',
description: 'Receive real-time messages from other developers via NATS',
description: 'Receive messages. First register_agent with a name.',
},
ReceiveMessagesSchema,
);
}
private setupSubscription() {
if (this.subscribed) return;
private setupSubscription(name: string) {
if (this.subscribed && this.myName === name) return;
const myRole = detectRole();
const channel = `dev.messages.${myRole}`;
// Subscribe to messages for this role (only if NATS is connected)
try {
this.natsClient.subscribe(channel, (msg) => {
this.natsClient.subscribe(`messages.${name}`, (msg) => {
this.messages.push(msg);
// Send notification to Claude Code UI
this.notifyNewMessage(msg);
console.log(`\n📬 MESSAGE from ${msg.from}: ${msg.subject}\n ${msg.message.substring(0, 200)}\n`);
});
this.natsClient.subscribe('messages.broadcast', (msg) => {
if (msg.to === name || msg.to === 'all') {
this.messages.push(msg);
}
});
this.subscribed = true;
this.myName = name;
} catch (error) {
// NATS not connected yet, will be called later
// NATS not connected
}
}
private notifyNewMessage(msg: any) {
const preview = msg.message?.substring(0, 100) || '';
console.log(`\n📬 NEW MESSAGE from ${msg.from}\n Subject: ${msg.subject}\n Preview: ${preview}...\n`);
protected async handle(input: ReceiveMessagesInput, context: ToolContext): Promise<any> {
let myName = '';
for (const [sid, info] of onlineAgents.entries()) {
if (sid === context.requestId || sid.includes(context.requestId)) {
myName = info.name;
break;
}
}
protected async handle(input: ReceiveMessagesInput, _context: ToolContext): Promise<any> {
// Setup subscription on first use (after NATS is connected)
this.setupSubscription();
if (!myName) {
return {
success: false,
message: 'Please register_agent first',
messages: [],
};
}
const limit = input.limit || 10;
this.setupSubscription(myName);
const limit = input.limit || 20;
const recentMessages = this.messages.slice(-limit);
return {
success: true,
role: detectRole(),
name: myName,
count: recentMessages.length,
messages: recentMessages,
};
}
}
// Keep old exports for compatibility
export { RegisterAgentTool as RegisterPresenceTool };
export { ListAgentsTool as ListOnlineTool };

View File

@@ -4,7 +4,7 @@ import { FileWriteTool } from './builtin/FileWriteTool';
import { FileListTool } from './builtin/FileListTool';
import { SystemCommandTool } from './builtin/SystemCommandTool';
import { HttpRequestTool } from './builtin/HttpRequestTool';
import { SendMessageTool, ReceiveMessagesTool } from './builtin/messaging';
import { RegisterAgentTool, ListAgentsTool, ListServicesTool, SendMessageTool, ReceiveMessagesTool } from './builtin/messaging';
import { NatsClient } from '../nats/NatsClient';
export * from './base/ToolHandler';
@@ -21,7 +21,10 @@ export function createBuiltinTools(natsClient: NatsClient): Map<string, ToolHand
tools.set('system_command', new SystemCommandTool());
tools.set('http_request', new HttpRequestTool());
// Messaging tools (require NATS client)
// Agent communication tools (require NATS client)
tools.set('register_agent', new RegisterAgentTool(natsClient));
tools.set('list_agents', new ListAgentsTool(natsClient));
tools.set('list_services', new ListServicesTool(natsClient));
tools.set('send_message', new SendMessageTool(natsClient));
tools.set('receive_messages', new ReceiveMessagesTool(natsClient));