feat: live trades (/live & sidebar) + sidebar skeleton
This commit is contained in:
parent
37d76b243b
commit
0ddb431536
12 changed files with 785 additions and 175 deletions
|
|
@ -15,23 +15,73 @@ if (!process.env.REDIS_URL) {
|
|||
const redis = new Redis(process.env.REDIS_URL);
|
||||
|
||||
redis.on('error', (err) => console.error('Redis Client Error', err));
|
||||
redis.on('connect', () => console.log('Connected to Redis'));
|
||||
|
||||
redis.psubscribe('comments:*');
|
||||
redis.on('connect', () => {
|
||||
redis.psubscribe('comments:*', 'prices:*', (err, count) => {
|
||||
if (err) console.error("Failed to psubscribe to patterns", err);
|
||||
else console.log(`Successfully psubscribed to patterns. Active psubscriptions: ${count}`);
|
||||
});
|
||||
|
||||
redis.on('pmessage', (_pattern, channel, msg) => {
|
||||
redis.subscribe('trades:all', 'trades:large', (err, count) => {
|
||||
if (err) console.error("Failed to subscribe to 'trades:all' and 'trades:large'", err);
|
||||
else console.log(`Successfully subscribed to 'trades:all', 'trades:large'. Active subscriptions: ${count}`);
|
||||
});
|
||||
});
|
||||
|
||||
redis.on('pmessage', (pattern, channel, msg) => {
|
||||
console.log(`[Redis pmessage RECEIVED] Pattern: "${pattern}", Channel: "${channel}", Message: "${msg}"`);
|
||||
try {
|
||||
const coinSymbol = channel.substring('comments:'.length);
|
||||
const sockets = coinSockets.get(coinSymbol);
|
||||
if (sockets) {
|
||||
for (const ws of sockets) {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(msg);
|
||||
if (channel.startsWith('comments:')) {
|
||||
const coinSymbol = channel.substring('comments:'.length);
|
||||
const sockets = coinSockets.get(coinSymbol);
|
||||
if (sockets) {
|
||||
for (const ws of sockets) {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (channel.startsWith('prices:')) {
|
||||
const coinSymbol = channel.substring('prices:'.length);
|
||||
const sockets = coinSockets.get(coinSymbol);
|
||||
console.log(`Received price update for ${coinSymbol}:`, msg);
|
||||
if (sockets) {
|
||||
const priceData = JSON.parse(msg);
|
||||
const priceMessage = JSON.stringify({
|
||||
type: 'price_update',
|
||||
coinSymbol,
|
||||
...priceData
|
||||
});
|
||||
|
||||
for (const ws of sockets) {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(priceMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error processing Redis message:', error);
|
||||
console.error('Error processing Redis pmessage:', error, `Pattern: ${pattern}, Channel: ${channel}, Raw message: ${msg}`);
|
||||
}
|
||||
});
|
||||
|
||||
redis.on('message', (channel, msg) => {
|
||||
try {
|
||||
if (channel === 'trades:all' || channel === 'trades:large') {
|
||||
const tradeData = JSON.parse(msg);
|
||||
|
||||
const tradeMessage = JSON.stringify(tradeData);
|
||||
|
||||
for (const [, sockets] of coinSockets.entries()) {
|
||||
for (const ws of sockets) {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(tradeMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error processing Redis message:', error, `Channel: ${channel}, Raw message: ${msg}`);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -117,6 +167,8 @@ const server = Bun.serve<WebSocketData, undefined>({
|
|||
|
||||
if (data.type === 'set_coin' && data.coinSymbol) {
|
||||
handleSetCoin(ws, data.coinSymbol);
|
||||
} else if (data.type === 'pong') {
|
||||
ws.data.lastActivity = Date.now();
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Message parsing error:', error);
|
||||
|
|
|
|||
Reference in a new issue