feat: implement notification system

This commit is contained in:
Face 2025-06-11 18:37:03 +03:00
parent de3f8a4929
commit e61c41e414
19 changed files with 883 additions and 3196 deletions

View file

@ -14,10 +14,22 @@ if (!process.env.REDIS_URL) {
const redis = new Redis(process.env.REDIS_URL);
const HEARTBEAT_INTERVAL = 30_000;
type WebSocketData = {
coinSymbol?: string;
userId?: string;
lastActivity: number;
};
const coinSockets = new Map<string, Set<ServerWebSocket<WebSocketData>>>();
const userSockets = new Map<string, Set<ServerWebSocket<WebSocketData>>>();
const pingIntervals = new WeakMap<ServerWebSocket<WebSocketData>, NodeJS.Timeout>();
redis.on('error', (err) => console.error('Redis Client Error', err));
redis.on('connect', () => {
redis.psubscribe('comments:*', 'prices:*', (err, count) => {
redis.psubscribe('comments:*', 'prices:*', 'notifications:*', (err, count) => {
if (err) console.error("Failed to psubscribe to patterns", err);
else console.log(`Successfully psubscribed to patterns. Active psubscriptions: ${count}`);
});
@ -59,6 +71,17 @@ redis.on('pmessage', (pattern, channel, msg) => {
}
}
}
} else if (channel.startsWith('notifications:')) {
const userId = channel.substring('notifications:'.length);
const sockets = userSockets.get(userId);
console.log(`Received notification for user ${userId}:`, msg);
if (sockets) {
for (const ws of sockets) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(msg);
}
}
}
}
} catch (error) {
console.error('Error processing Redis pmessage:', error, `Pattern: ${pattern}, Channel: ${channel}, Raw message: ${msg}`);
@ -85,16 +108,6 @@ redis.on('message', (channel, msg) => {
}
});
const HEARTBEAT_INTERVAL = 30_000;
type WebSocketData = {
coinSymbol?: string;
lastActivity: number;
};
const coinSockets = new Map<string, Set<ServerWebSocket<WebSocketData>>>();
const pingIntervals = new WeakMap<ServerWebSocket<WebSocketData>, NodeJS.Timeout>();
function handleSetCoin(ws: ServerWebSocket<WebSocketData>, coinSymbol: string) {
if (ws.data.coinSymbol) {
const prev = coinSockets.get(ws.data.coinSymbol);
@ -115,6 +128,26 @@ function handleSetCoin(ws: ServerWebSocket<WebSocketData>, coinSymbol: string) {
}
}
function handleSetUser(ws: ServerWebSocket<WebSocketData>, userId: string) {
if (ws.data.userId) {
const prev = userSockets.get(ws.data.userId);
if (prev) {
prev.delete(ws);
if (prev.size === 0) {
userSockets.delete(ws.data.userId);
}
}
}
ws.data.userId = userId;
if (!userSockets.has(userId)) {
userSockets.set(userId, new Set([ws]));
} else {
userSockets.get(userId)!.add(ws);
}
}
function checkConnections() {
const now = Date.now();
for (const [coinSymbol, sockets] of coinSockets.entries()) {
@ -163,10 +196,13 @@ const server = Bun.serve<WebSocketData, undefined>({
const data = JSON.parse(msg) as {
type: string;
coinSymbol?: string;
userId?: string;
};
if (data.type === 'set_coin' && data.coinSymbol) {
handleSetCoin(ws, data.coinSymbol);
} else if (data.type === 'set_user' && data.userId) {
handleSetUser(ws, data.userId);
} else if (data.type === 'pong') {
ws.data.lastActivity = Date.now();
}
@ -201,6 +237,16 @@ const server = Bun.serve<WebSocketData, undefined>({
}
}
}
if (ws.data.userId) {
const sockets = userSockets.get(ws.data.userId);
if (sockets) {
sockets.delete(ws);
if (sockets.size === 0) {
userSockets.delete(ws.data.userId);
}
}
}
}
}
});