feat: websockets (comment posting + liking)
This commit is contained in:
parent
251609d7b8
commit
3f137e5c3c
15 changed files with 2200 additions and 5 deletions
1681
website/package-lock.json
generated
1681
website/package-lock.json
generated
File diff suppressed because it is too large
Load diff
|
|
@ -20,6 +20,7 @@
|
||||||
"@sveltejs/adapter-auto": "^3.0.0",
|
"@sveltejs/adapter-auto": "^3.0.0",
|
||||||
"@sveltejs/kit": "^2.0.0",
|
"@sveltejs/kit": "^2.0.0",
|
||||||
"@sveltejs/vite-plugin-svelte": "^4.0.0",
|
"@sveltejs/vite-plugin-svelte": "^4.0.0",
|
||||||
|
"@types/node": "^22.15.21",
|
||||||
"autoprefixer": "^10.4.20",
|
"autoprefixer": "^10.4.20",
|
||||||
"bits-ui": "^1.5.0",
|
"bits-ui": "^1.5.0",
|
||||||
"clsx": "^2.1.1",
|
"clsx": "^2.1.1",
|
||||||
|
|
@ -45,6 +46,7 @@
|
||||||
"@visx/scale": "^3.12.0",
|
"@visx/scale": "^3.12.0",
|
||||||
"better-auth": "^1.2.8",
|
"better-auth": "^1.2.8",
|
||||||
"drizzle-orm": "^0.33.0",
|
"drizzle-orm": "^0.33.0",
|
||||||
|
"ioredis": "^5.6.1",
|
||||||
"lightweight-charts": "^5.0.7",
|
"lightweight-charts": "^5.0.7",
|
||||||
"lucide-svelte": "^0.511.0",
|
"lucide-svelte": "^0.511.0",
|
||||||
"mode-watcher": "^1.0.7",
|
"mode-watcher": "^1.0.7",
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@
|
||||||
import { goto } from '$app/navigation';
|
import { goto } from '$app/navigation';
|
||||||
import { formatTimeAgo, getPublicUrl } from '$lib/utils';
|
import { formatTimeAgo, getPublicUrl } from '$lib/utils';
|
||||||
import SignInConfirmDialog from '$lib/components/self/SignInConfirmDialog.svelte';
|
import SignInConfirmDialog from '$lib/components/self/SignInConfirmDialog.svelte';
|
||||||
|
import WebSocket, { type WebSocketHandle } from '$lib/components/self/WebSocket.svelte';
|
||||||
|
|
||||||
const { coinSymbol } = $props<{ coinSymbol: string }>();
|
const { coinSymbol } = $props<{ coinSymbol: string }>();
|
||||||
import type { Comment } from '$lib/types/comment';
|
import type { Comment } from '$lib/types/comment';
|
||||||
|
|
@ -19,6 +20,39 @@
|
||||||
let isSubmitting = $state(false);
|
let isSubmitting = $state(false);
|
||||||
let isLoading = $state(true);
|
let isLoading = $state(true);
|
||||||
let shouldSignIn = $state(false);
|
let shouldSignIn = $state(false);
|
||||||
|
let wsManager = $state<WebSocketHandle | undefined>();
|
||||||
|
|
||||||
|
function handleWebSocketMessage(message: { type: string; data?: any }) {
|
||||||
|
switch (message.type) {
|
||||||
|
case 'new_comment':
|
||||||
|
// check if comment already exists
|
||||||
|
const commentExists = comments.some((c) => c.id === message.data.id);
|
||||||
|
if (!commentExists) {
|
||||||
|
comments = [message.data, ...comments];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 'comment_liked':
|
||||||
|
const commentIndex = comments.findIndex((c) => c.id === message.data.commentId);
|
||||||
|
if (commentIndex !== -1) {
|
||||||
|
comments[commentIndex] = {
|
||||||
|
...comments[commentIndex],
|
||||||
|
likesCount: message.data.likesCount,
|
||||||
|
isLikedByUser:
|
||||||
|
message.data.userId === Number($USER_DATA?.id)
|
||||||
|
? message.data.isLikedByUser
|
||||||
|
: comments[commentIndex].isLikedByUser
|
||||||
|
};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleWebSocketOpen() {
|
||||||
|
wsManager?.send({
|
||||||
|
type: 'set_coin',
|
||||||
|
coinSymbol
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async function loadComments() {
|
async function loadComments() {
|
||||||
try {
|
try {
|
||||||
|
|
@ -52,7 +86,11 @@
|
||||||
|
|
||||||
if (response.ok) {
|
if (response.ok) {
|
||||||
const result = await response.json();
|
const result = await response.json();
|
||||||
comments = [result.comment, ...comments];
|
// check if comment already exists (from ws) before adding
|
||||||
|
const commentExists = comments.some((c) => c.id === result.comment.id);
|
||||||
|
if (!commentExists) {
|
||||||
|
comments = [result.comment, ...comments];
|
||||||
|
}
|
||||||
newComment = '';
|
newComment = '';
|
||||||
} else {
|
} else {
|
||||||
const error = await response.json();
|
const error = await response.json();
|
||||||
|
|
@ -112,6 +150,13 @@
|
||||||
|
|
||||||
<SignInConfirmDialog bind:open={shouldSignIn} />
|
<SignInConfirmDialog bind:open={shouldSignIn} />
|
||||||
|
|
||||||
|
<WebSocket
|
||||||
|
bind:this={wsManager}
|
||||||
|
onMessage={handleWebSocketMessage}
|
||||||
|
onOpen={handleWebSocketOpen}
|
||||||
|
disableReconnect={true}
|
||||||
|
/>
|
||||||
|
|
||||||
<Card.Root>
|
<Card.Root>
|
||||||
<Card.Header>
|
<Card.Header>
|
||||||
<Card.Title class="flex items-center gap-2">
|
<Card.Title class="flex items-center gap-2">
|
||||||
|
|
|
||||||
102
website/src/lib/components/self/WebSocket.svelte
Normal file
102
website/src/lib/components/self/WebSocket.svelte
Normal file
|
|
@ -0,0 +1,102 @@
|
||||||
|
<script lang="ts">
|
||||||
|
import { PUBLIC_WEBSOCKET_URL } from '$env/static/public';
|
||||||
|
import { onMount } from 'svelte';
|
||||||
|
|
||||||
|
export interface WebSocketHandle {
|
||||||
|
send: (data: any) => void;
|
||||||
|
ws: WebSocket | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
type WebSocketMessage = {
|
||||||
|
type: string;
|
||||||
|
data?: any;
|
||||||
|
coinSymbol?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
let {
|
||||||
|
onMessage,
|
||||||
|
onOpen = undefined,
|
||||||
|
onClose = undefined,
|
||||||
|
disableReconnect = false
|
||||||
|
} = $props<{
|
||||||
|
onMessage: (message: WebSocketMessage) => void;
|
||||||
|
onOpen?: () => void;
|
||||||
|
onClose?: (event: CloseEvent) => void;
|
||||||
|
disableReconnect?: boolean;
|
||||||
|
}>();
|
||||||
|
|
||||||
|
let ws = $state<WebSocket | null>(null);
|
||||||
|
let reconnectAttempts = $state(0);
|
||||||
|
const MAX_RECONNECT_ATTEMPTS = 5;
|
||||||
|
const BASE_DELAY = 1000;
|
||||||
|
|
||||||
|
async function initializeWebSocket() {
|
||||||
|
ws = new WebSocket(PUBLIC_WEBSOCKET_URL);
|
||||||
|
|
||||||
|
ws.addEventListener('open', () => {
|
||||||
|
reconnectAttempts = 0;
|
||||||
|
onOpen?.();
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.addEventListener('message', (event) => {
|
||||||
|
try {
|
||||||
|
const message = JSON.parse(event.data) as WebSocketMessage;
|
||||||
|
|
||||||
|
if (message.type === 'ping') {
|
||||||
|
ws?.send(JSON.stringify({ type: 'pong' }));
|
||||||
|
} else {
|
||||||
|
onMessage(message);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
console.error('WebSocket message parse error:', e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.addEventListener('close', async (event) => {
|
||||||
|
if (onClose) {
|
||||||
|
onClose(event);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (disableReconnect || reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const delay = BASE_DELAY * Math.pow(2, reconnectAttempts);
|
||||||
|
reconnectAttempts++;
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
handleReconnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.addEventListener('error', (event) => {
|
||||||
|
console.error('WebSocket error:', event);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
async function handleReconnect() {
|
||||||
|
try {
|
||||||
|
await initializeWebSocket();
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Reconnect failed:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
onMount(() => {
|
||||||
|
initializeWebSocket().catch((error) => {
|
||||||
|
console.error(`Connection failed: ${error.message}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
ws?.close();
|
||||||
|
ws = null;
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
export { ws as ws };
|
||||||
|
|
||||||
|
export const send: WebSocketHandle['send'] = (data) => {
|
||||||
|
if (ws?.readyState === WebSocket.OPEN) {
|
||||||
|
ws.send(JSON.stringify(data));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
</script>
|
||||||
19
website/src/lib/server/redis.ts
Normal file
19
website/src/lib/server/redis.ts
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
import Redis from 'ioredis';
|
||||||
|
import { building } from '$app/environment';
|
||||||
|
import { REDIS_URL } from '$env/static/private';
|
||||||
|
|
||||||
|
if (building) {
|
||||||
|
throw new Error('Redis cannot be used during build');
|
||||||
|
}
|
||||||
|
|
||||||
|
const redis = new Redis(REDIS_URL);
|
||||||
|
|
||||||
|
redis.on('error', (err) => {
|
||||||
|
console.error('Redis connection error:', err);
|
||||||
|
});
|
||||||
|
|
||||||
|
redis.on('connect', () => {
|
||||||
|
console.log('Redis connected successfully');
|
||||||
|
});
|
||||||
|
|
||||||
|
export { redis };
|
||||||
|
|
@ -33,7 +33,7 @@ export function getPublicUrl(key: string | null): string | null {
|
||||||
}
|
}
|
||||||
|
|
||||||
export function debounce(func: (...args: any[]) => void, wait: number) {
|
export function debounce(func: (...args: any[]) => void, wait: number) {
|
||||||
let timeout: number | undefined;
|
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||||
return function executedFunction(...args: any[]) {
|
return function executedFunction(...args: any[]) {
|
||||||
const later = () => {
|
const later = () => {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import { error, json } from '@sveltejs/kit';
|
||||||
import { db } from '$lib/server/db';
|
import { db } from '$lib/server/db';
|
||||||
import { comment, coin, user, commentLike } from '$lib/server/db/schema';
|
import { comment, coin, user, commentLike } from '$lib/server/db/schema';
|
||||||
import { eq, and, desc, sql } from 'drizzle-orm';
|
import { eq, and, desc, sql } from 'drizzle-orm';
|
||||||
|
import { redis } from '$lib/server/redis';
|
||||||
|
|
||||||
export async function GET({ params, request }) {
|
export async function GET({ params, request }) {
|
||||||
const session = await auth.api.getSession({
|
const session = await auth.api.getSession({
|
||||||
|
|
@ -117,6 +118,14 @@ export async function POST({ request, params }) {
|
||||||
.where(eq(comment.id, newComment.id))
|
.where(eq(comment.id, newComment.id))
|
||||||
.limit(1);
|
.limit(1);
|
||||||
|
|
||||||
|
await redis.publish(
|
||||||
|
`comments:${normalizedSymbol}`,
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'new_comment',
|
||||||
|
data: commentWithUser
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
return json({ comment: commentWithUser });
|
return json({ comment: commentWithUser });
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error('Error creating comment:', e);
|
console.error('Error creating comment:', e);
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import { comment, commentLike, coin } from '$lib/server/db/schema';
|
||||||
import { eq, and, sql } from 'drizzle-orm';
|
import { eq, and, sql } from 'drizzle-orm';
|
||||||
import type { RequestHandler } from './$types';
|
import type { RequestHandler } from './$types';
|
||||||
import { auth } from '$lib/auth';
|
import { auth } from '$lib/auth';
|
||||||
|
import { redis } from '$lib/server/redis';
|
||||||
|
|
||||||
export const POST: RequestHandler = async ({ request, params }) => {
|
export const POST: RequestHandler = async ({ request, params }) => {
|
||||||
const session = await auth.api.getSession({
|
const session = await auth.api.getSession({
|
||||||
|
|
@ -54,6 +55,24 @@ export const POST: RequestHandler = async ({ request, params }) => {
|
||||||
.where(eq(comment.id, commentId));
|
.where(eq(comment.id, commentId));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const [updatedComment] = await db
|
||||||
|
.select({ likesCount: comment.likesCount })
|
||||||
|
.from(comment)
|
||||||
|
.where(eq(comment.id, commentId));
|
||||||
|
|
||||||
|
await redis.publish(
|
||||||
|
`comments:${coinSymbol!.toUpperCase()}`,
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'comment_liked',
|
||||||
|
data: {
|
||||||
|
commentId: Number(commentId),
|
||||||
|
likesCount: updatedComment.likesCount,
|
||||||
|
isLikedByUser: true,
|
||||||
|
userId
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
return json({ success: true });
|
return json({ success: true });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to like comment:', error);
|
console.error('Failed to like comment:', error);
|
||||||
|
|
@ -112,6 +131,24 @@ export const DELETE: RequestHandler = async ({ request, params }) => {
|
||||||
.where(eq(comment.id, commentId));
|
.where(eq(comment.id, commentId));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const [updatedComment] = await db
|
||||||
|
.select({ likesCount: comment.likesCount })
|
||||||
|
.from(comment)
|
||||||
|
.where(eq(comment.id, commentId));
|
||||||
|
|
||||||
|
await redis.publish(
|
||||||
|
`comments:${coinSymbol.toUpperCase()}`,
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'comment_liked',
|
||||||
|
data: {
|
||||||
|
commentId: Number(commentId),
|
||||||
|
likesCount: updatedComment.likesCount,
|
||||||
|
isLikedByUser: false,
|
||||||
|
userId
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
return json({ success: true });
|
return json({ success: true });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to unlike comment:', error);
|
console.error('Failed to unlike comment:', error);
|
||||||
|
|
|
||||||
|
|
@ -164,7 +164,7 @@
|
||||||
{#if leaderboardData.biggestLosers.length === 0}
|
{#if leaderboardData.biggestLosers.length === 0}
|
||||||
<div class="py-8 text-center">
|
<div class="py-8 text-center">
|
||||||
<p class="text-muted-foreground">
|
<p class="text-muted-foreground">
|
||||||
Everyone's in profit today! 📈 (This won't last...)
|
No major losses recorded today
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
{:else}
|
{:else}
|
||||||
|
|
|
||||||
34
website/websocket/.gitignore
vendored
Normal file
34
website/websocket/.gitignore
vendored
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
# dependencies (bun install)
|
||||||
|
node_modules
|
||||||
|
|
||||||
|
# output
|
||||||
|
out
|
||||||
|
dist
|
||||||
|
*.tgz
|
||||||
|
|
||||||
|
# code coverage
|
||||||
|
coverage
|
||||||
|
*.lcov
|
||||||
|
|
||||||
|
# logs
|
||||||
|
logs
|
||||||
|
_.log
|
||||||
|
report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
|
||||||
|
|
||||||
|
# dotenv environment variable files
|
||||||
|
.env
|
||||||
|
.env.development.local
|
||||||
|
.env.test.local
|
||||||
|
.env.production.local
|
||||||
|
.env.local
|
||||||
|
|
||||||
|
# caches
|
||||||
|
.eslintcache
|
||||||
|
.cache
|
||||||
|
*.tsbuildinfo
|
||||||
|
|
||||||
|
# IntelliJ based IDEs
|
||||||
|
.idea
|
||||||
|
|
||||||
|
# Finder (MacOS) folder config
|
||||||
|
.DS_Store
|
||||||
15
website/websocket/README.md
Normal file
15
website/websocket/README.md
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
# websocket
|
||||||
|
|
||||||
|
To install dependencies:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
bun install
|
||||||
|
```
|
||||||
|
|
||||||
|
To run:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
bun run src/main.ts
|
||||||
|
```
|
||||||
|
|
||||||
|
This project was created using `bun init` in bun v1.2.11. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime.
|
||||||
53
website/websocket/bun.lock
Normal file
53
website/websocket/bun.lock
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
{
|
||||||
|
"lockfileVersion": 1,
|
||||||
|
"workspaces": {
|
||||||
|
"": {
|
||||||
|
"name": "websocket",
|
||||||
|
"dependencies": {
|
||||||
|
"dotenv": "^16.5.0",
|
||||||
|
"ioredis": "^5.6.1",
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"@types/bun": "latest",
|
||||||
|
},
|
||||||
|
"peerDependencies": {
|
||||||
|
"typescript": "^5",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"packages": {
|
||||||
|
"@ioredis/commands": ["@ioredis/commands@1.2.0", "", {}, "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg=="],
|
||||||
|
|
||||||
|
"@types/bun": ["@types/bun@1.2.14", "", { "dependencies": { "bun-types": "1.2.14" } }, "sha512-VsFZKs8oKHzI7zwvECiAJ5oSorWndIWEVhfbYqZd4HI/45kzW7PN2Rr5biAzvGvRuNmYLSANY+H59ubHq8xw7Q=="],
|
||||||
|
|
||||||
|
"@types/node": ["@types/node@22.15.21", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-EV/37Td6c+MgKAbkcLG6vqZ2zEYHD7bvSrzqqs2RIhbA6w3x+Dqz8MZM3sP6kGTeLrdoOgKZe+Xja7tUB2DNkQ=="],
|
||||||
|
|
||||||
|
"bun-types": ["bun-types@1.2.14", "", { "dependencies": { "@types/node": "*" } }, "sha512-Kuh4Ub28ucMRWeiUUWMHsT9Wcbr4H3kLIO72RZZElSDxSu7vpetRvxIUDUaW6QtaIeixIpm7OXtNnZPf82EzwA=="],
|
||||||
|
|
||||||
|
"cluster-key-slot": ["cluster-key-slot@1.1.2", "", {}, "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA=="],
|
||||||
|
|
||||||
|
"debug": ["debug@4.4.1", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ=="],
|
||||||
|
|
||||||
|
"denque": ["denque@2.1.0", "", {}, "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw=="],
|
||||||
|
|
||||||
|
"dotenv": ["dotenv@16.5.0", "", {}, "sha512-m/C+AwOAr9/W1UOIZUo232ejMNnJAJtYQjUbHoNTBNTJSvqzzDh7vnrei3o3r3m9blf6ZoDkvcw0VmozNRFJxg=="],
|
||||||
|
|
||||||
|
"ioredis": ["ioredis@5.6.1", "", { "dependencies": { "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", "debug": "^4.3.4", "denque": "^2.1.0", "lodash.defaults": "^4.2.0", "lodash.isarguments": "^3.1.0", "redis-errors": "^1.2.0", "redis-parser": "^3.0.0", "standard-as-callback": "^2.1.0" } }, "sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA=="],
|
||||||
|
|
||||||
|
"lodash.defaults": ["lodash.defaults@4.2.0", "", {}, "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="],
|
||||||
|
|
||||||
|
"lodash.isarguments": ["lodash.isarguments@3.1.0", "", {}, "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg=="],
|
||||||
|
|
||||||
|
"ms": ["ms@2.1.3", "", {}, "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA=="],
|
||||||
|
|
||||||
|
"redis-errors": ["redis-errors@1.2.0", "", {}, "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w=="],
|
||||||
|
|
||||||
|
"redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="],
|
||||||
|
|
||||||
|
"standard-as-callback": ["standard-as-callback@2.1.0", "", {}, "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A=="],
|
||||||
|
|
||||||
|
"typescript": ["typescript@5.8.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ=="],
|
||||||
|
|
||||||
|
"undici-types": ["undici-types@6.21.0", "", {}, "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ=="],
|
||||||
|
}
|
||||||
|
}
|
||||||
16
website/websocket/package.json
Normal file
16
website/websocket/package.json
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
{
|
||||||
|
"name": "websocket",
|
||||||
|
"module": "src/main.ts",
|
||||||
|
"type": "module",
|
||||||
|
"private": true,
|
||||||
|
"devDependencies": {
|
||||||
|
"@types/bun": "latest"
|
||||||
|
},
|
||||||
|
"peerDependencies": {
|
||||||
|
"typescript": "^5"
|
||||||
|
},
|
||||||
|
"dependencies": {
|
||||||
|
"dotenv": "^16.5.0",
|
||||||
|
"ioredis": "^5.6.1"
|
||||||
|
}
|
||||||
|
}
|
||||||
158
website/websocket/src/main.ts
Normal file
158
website/websocket/src/main.ts
Normal file
|
|
@ -0,0 +1,158 @@
|
||||||
|
import type { ServerWebSocket } from 'bun';
|
||||||
|
import Redis from 'ioredis';
|
||||||
|
import { config } from 'dotenv';
|
||||||
|
import { fileURLToPath } from 'url';
|
||||||
|
import { dirname, join } from 'path';
|
||||||
|
|
||||||
|
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||||
|
config({ path: join(__dirname, '../../.env') });
|
||||||
|
|
||||||
|
if (!process.env.REDIS_URL) {
|
||||||
|
console.error("REDIS_URL is not defined in environment variables.");
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const redis = new Redis(process.env.REDIS_URL);
|
||||||
|
|
||||||
|
redis.on('error', (err) => console.error('Redis Client Error', err));
|
||||||
|
redis.on('connect', () => console.log('Connected to Redis'));
|
||||||
|
|
||||||
|
redis.psubscribe('comments:*');
|
||||||
|
|
||||||
|
redis.on('pmessage', (_pattern, channel, msg) => {
|
||||||
|
try {
|
||||||
|
const coinSymbol = channel.substring('comments:'.length);
|
||||||
|
const sockets = coinSockets.get(coinSymbol);
|
||||||
|
if (sockets) {
|
||||||
|
for (const ws of sockets) {
|
||||||
|
if (ws.readyState === WebSocket.OPEN) {
|
||||||
|
ws.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error processing Redis message:', error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const HEARTBEAT_INTERVAL = 30_000;
|
||||||
|
|
||||||
|
type WebSocketData = {
|
||||||
|
coinSymbol?: string;
|
||||||
|
lastActivity: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const coinSockets = new Map<string, Set<ServerWebSocket<WebSocketData>>>();
|
||||||
|
const pingIntervals = new WeakMap<ServerWebSocket<WebSocketData>, NodeJS.Timeout>();
|
||||||
|
|
||||||
|
function handleSetCoin(ws: ServerWebSocket<WebSocketData>, coinSymbol: string) {
|
||||||
|
if (ws.data.coinSymbol) {
|
||||||
|
const prev = coinSockets.get(ws.data.coinSymbol);
|
||||||
|
if (prev) {
|
||||||
|
prev.delete(ws);
|
||||||
|
if (prev.size === 0) {
|
||||||
|
coinSockets.delete(ws.data.coinSymbol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.data.coinSymbol = coinSymbol;
|
||||||
|
|
||||||
|
if (!coinSockets.has(coinSymbol)) {
|
||||||
|
coinSockets.set(coinSymbol, new Set([ws]));
|
||||||
|
} else {
|
||||||
|
coinSockets.get(coinSymbol)!.add(ws);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function checkConnections() {
|
||||||
|
const now = Date.now();
|
||||||
|
for (const [coinSymbol, sockets] of coinSockets.entries()) {
|
||||||
|
const staleSockets = Array.from(sockets).filter(ws => now - ws.data.lastActivity > HEARTBEAT_INTERVAL * 2);
|
||||||
|
for (const socket of staleSockets) {
|
||||||
|
socket.terminate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setInterval(checkConnections, HEARTBEAT_INTERVAL);
|
||||||
|
|
||||||
|
const server = Bun.serve<WebSocketData, undefined>({
|
||||||
|
port: Number(process.env.PORT) || 8080,
|
||||||
|
|
||||||
|
fetch(request, server) {
|
||||||
|
const url = new URL(request.url);
|
||||||
|
|
||||||
|
if (url.pathname === '/health') {
|
||||||
|
return new Response(JSON.stringify({
|
||||||
|
status: 'ok',
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
activeConnections: Array.from(coinSockets.values()).reduce((total, set) => total + set.size, 0)
|
||||||
|
}), {
|
||||||
|
headers: { 'Content-Type': 'application/json' }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const upgraded = server.upgrade(request, {
|
||||||
|
data: {
|
||||||
|
coinSymbol: undefined,
|
||||||
|
lastActivity: Date.now()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return upgraded ? undefined : new Response('Upgrade failed', { status: 500 });
|
||||||
|
},
|
||||||
|
|
||||||
|
websocket: {
|
||||||
|
message(ws, msg) {
|
||||||
|
ws.data.lastActivity = Date.now();
|
||||||
|
|
||||||
|
if (typeof msg !== 'string') return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(msg) as {
|
||||||
|
type: string;
|
||||||
|
coinSymbol?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (data.type === 'set_coin' && data.coinSymbol) {
|
||||||
|
handleSetCoin(ws, data.coinSymbol);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Message parsing error:', error);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
open(ws) {
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
if (ws.readyState === 1) {
|
||||||
|
ws.data.lastActivity = Date.now();
|
||||||
|
ws.send(JSON.stringify({ type: 'ping' }));
|
||||||
|
} else {
|
||||||
|
clearInterval(interval);
|
||||||
|
}
|
||||||
|
}, HEARTBEAT_INTERVAL);
|
||||||
|
|
||||||
|
pingIntervals.set(ws, interval);
|
||||||
|
}, close(ws) {
|
||||||
|
const interval = pingIntervals.get(ws);
|
||||||
|
if (interval) {
|
||||||
|
clearInterval(interval);
|
||||||
|
pingIntervals.delete(ws);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ws.data.coinSymbol) {
|
||||||
|
const sockets = coinSockets.get(ws.data.coinSymbol);
|
||||||
|
if (sockets) {
|
||||||
|
sockets.delete(ws);
|
||||||
|
if (sockets.size === 0) {
|
||||||
|
coinSockets.delete(ws.data.coinSymbol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(`WebSocket server running on port ${server.port}`);
|
||||||
|
console.log('Server listening for connections...');
|
||||||
|
console.log('Health check available at: http://localhost:8080/health');
|
||||||
28
website/websocket/tsconfig.json
Normal file
28
website/websocket/tsconfig.json
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
{
|
||||||
|
"compilerOptions": {
|
||||||
|
// Environment setup & latest features
|
||||||
|
"lib": ["ESNext"],
|
||||||
|
"target": "ESNext",
|
||||||
|
"module": "ESNext",
|
||||||
|
"moduleDetection": "force",
|
||||||
|
"jsx": "react-jsx",
|
||||||
|
"allowJs": true,
|
||||||
|
|
||||||
|
// Bundler mode
|
||||||
|
"moduleResolution": "bundler",
|
||||||
|
"allowImportingTsExtensions": true,
|
||||||
|
"verbatimModuleSyntax": true,
|
||||||
|
"noEmit": true,
|
||||||
|
|
||||||
|
// Best practices
|
||||||
|
"strict": true,
|
||||||
|
"skipLibCheck": true,
|
||||||
|
"noFallthroughCasesInSwitch": true,
|
||||||
|
"noUncheckedIndexedAccess": true,
|
||||||
|
|
||||||
|
// Some stricter flags (disabled by default)
|
||||||
|
"noUnusedLocals": false,
|
||||||
|
"noUnusedParameters": false,
|
||||||
|
"noPropertyAccessFromIndexSignature": false
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in a new issue