diff --git a/website/src/lib/components/self/AppSidebar.svelte b/website/src/lib/components/self/AppSidebar.svelte index c4edc75..e412e9a 100644 --- a/website/src/lib/components/self/AppSidebar.svelte +++ b/website/src/lib/components/self/AppSidebar.svelte @@ -3,6 +3,7 @@ import * as DropdownMenu from '$lib/components/ui/dropdown-menu'; import * as Avatar from '$lib/components/ui/avatar'; import { Badge } from '$lib/components/ui/badge'; + import { Skeleton } from '$lib/components/ui/skeleton'; import { Moon, Sun, @@ -18,7 +19,10 @@ BellIcon, LogOutIcon, Wallet, - Trophy + Trophy, + Activity, + TrendingUp, + TrendingDown } from 'lucide-svelte'; import { mode, setMode } from 'mode-watcher'; import type { HTMLAttributes } from 'svelte/elements'; @@ -28,8 +32,9 @@ import SignInConfirmDialog from './SignInConfirmDialog.svelte'; import DailyRewards from './DailyRewards.svelte'; import { signOut } from '$lib/auth-client'; - import { getPublicUrl } from '$lib/utils'; + import { formatValue, getPublicUrl } from '$lib/utils'; import { goto } from '$app/navigation'; + import { liveTradesStore, isLoadingTrades, type LiveTrade } from '$lib/stores/websocket'; const data = { navMain: [ @@ -46,7 +51,6 @@ const { setOpenMobile, isMobile } = useSidebar(); let shouldSignIn = $state(false); - // Fetch portfolio data when user is authenticated $effect(() => { if ($USER_DATA) { fetchPortfolioData(); @@ -70,6 +74,16 @@ maximumFractionDigits: 2 }); } + + function handleLiveTradesClick() { + goto('/live'); + setOpenMobile(false); + } + + function handleTradeClick(coinSymbol: string) { + goto(`/coin/${coinSymbol.toLowerCase()}`); + setOpenMobile(false); + } @@ -142,44 +156,148 @@ - + + {#if $USER_DATA}
- + {#if !$PORTFOLIO_DATA} +
+ +
+ {:else} + + {/if}
{/if} + + + +
+ + Live Trades +
+ +
+ +
+ {#if $isLoadingTrades} + {#each Array(5) as _, i} +
+
+ + +
+
+
+ + +
+
+
+ {/each} + {:else if $liveTradesStore.length === 0} +
No big trades yet...
+ {:else} + {#each $liveTradesStore.slice(0, 5) as trade, index (`${trade.timestamp}-${trade.username}-${trade.coinSymbol}-${index}`)} + + {/each} + {/if} +
+
+
+ - {#if $USER_DATA && $PORTFOLIO_DATA} + {#if $USER_DATA} Portfolio -
-
-
- - Total Value +
+ {#if !$PORTFOLIO_DATA} +
+
+ + +
+
- - ${formatCurrency($PORTFOLIO_DATA.totalValue)} - -
-
-
- Cash: - ${formatCurrency($PORTFOLIO_DATA.baseCurrencyBalance)} +
+
+ + +
+
+ + +
-
- Coins: - ${formatCurrency($PORTFOLIO_DATA.totalCoinValue)} + {:else} +
+
+ + Total Value +
+ + ${formatCurrency($PORTFOLIO_DATA.totalValue)} +
-
+
+
+ Cash: + ${formatCurrency($PORTFOLIO_DATA.baseCurrencyBalance)} +
+
+ Coins: + ${formatCurrency($PORTFOLIO_DATA.totalCoinValue)} +
+
+ {/if}
diff --git a/website/src/lib/components/self/CommentSection.svelte b/website/src/lib/components/self/CommentSection.svelte index d2c8327..f8272bf 100644 --- a/website/src/lib/components/self/CommentSection.svelte +++ b/website/src/lib/components/self/CommentSection.svelte @@ -12,7 +12,7 @@ import { formatTimeAgo, getPublicUrl } from '$lib/utils'; import SignInConfirmDialog from '$lib/components/self/SignInConfirmDialog.svelte'; import UserProfilePreview from '$lib/components/self/UserProfilePreview.svelte'; - import WebSocket, { type WebSocketHandle } from '$lib/components/self/WebSocket.svelte'; + import { websocketController } from '$lib/stores/websocket'; const { coinSymbol } = $props<{ coinSymbol: string }>(); import type { Comment } from '$lib/types/comment'; @@ -21,7 +21,15 @@ let isSubmitting = $state(false); let isLoading = $state(true); let shouldSignIn = $state(false); - let wsManager = $state(); + + $effect(() => { + websocketController.setCoin(coinSymbol); + websocketController.subscribeToComments(coinSymbol, handleWebSocketMessage); + + return () => { + websocketController.unsubscribeFromComments(coinSymbol); + }; + }); function handleWebSocketMessage(message: { type: string; data?: any }) { switch (message.type) { @@ -48,13 +56,6 @@ } } - function handleWebSocketOpen() { - wsManager?.send({ - type: 'set_coin', - coinSymbol - }); - } - async function loadComments() { try { const response = await fetch(`/api/coin/${coinSymbol}/comments`); @@ -151,13 +152,6 @@ - - diff --git a/website/src/lib/components/self/DailyRewards.svelte b/website/src/lib/components/self/DailyRewards.svelte index c5069b0..a186d94 100644 --- a/website/src/lib/components/self/DailyRewards.svelte +++ b/website/src/lib/components/self/DailyRewards.svelte @@ -141,26 +141,24 @@ } -{#if $USER_DATA && rewardStatus} - -{/if} + diff --git a/website/src/lib/components/self/WebSocket.svelte b/website/src/lib/components/self/WebSocket.svelte deleted file mode 100644 index 2de7c66..0000000 --- a/website/src/lib/components/self/WebSocket.svelte +++ /dev/null @@ -1,102 +0,0 @@ - diff --git a/website/src/lib/stores/websocket.ts b/website/src/lib/stores/websocket.ts new file mode 100644 index 0000000..4d220dc --- /dev/null +++ b/website/src/lib/stores/websocket.ts @@ -0,0 +1,212 @@ +import { writable } from 'svelte/store'; +import { browser } from '$app/environment'; +import { PUBLIC_WEBSOCKET_URL } from '$env/static/public'; + +export interface LiveTrade { + type: 'BUY' | 'SELL'; + username: string; + amount: number; + coinSymbol: string; + coinName?: string; + coinIcon?: string; + totalValue: number; + price: number; + timestamp: number; + userId: string; + userImage?: string; +} + +// Constants +const WEBSOCKET_URL = PUBLIC_WEBSOCKET_URL; +const RECONNECT_DELAY = 5000; +const MAX_LIVE_TRADES = 5; +const MAX_ALL_TRADES = 100; + +// WebSocket state +let socket: WebSocket | null = null; +let reconnectTimer: NodeJS.Timeout | null = null; +let activeCoin: string = '@global'; + +// Stores +export const liveTradesStore = writable([]); +export const allTradesStore = writable([]); +export const isConnectedStore = writable(false); +export const isLoadingTrades = writable(false); + +// Comment callbacks +const commentSubscriptions = new Map void>(); + +async function loadInitialTrades(): Promise { + if (!browser) return; + + isLoadingTrades.set(true); + + try { + const [largeTradesResponse, allTradesResponse] = await Promise.all([ + fetch('/api/trades/recent?limit=5&minValue=1000'), + fetch('/api/trades/recent?limit=100') + ]); + + if (largeTradesResponse.ok) { + const { trades } = await largeTradesResponse.json(); + liveTradesStore.set(trades); + } + + if (allTradesResponse.ok) { + const { trades } = await allTradesResponse.json(); + allTradesStore.set(trades); + } + } catch (error) { + console.error('Failed to load initial trades:', error); + } finally { + isLoadingTrades.set(false); + } +} + +function clearReconnectTimer(): void { + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } +} + +function scheduleReconnect(): void { + clearReconnectTimer(); + reconnectTimer = setTimeout(connect, RECONNECT_DELAY); +} + +function isSocketConnected(): boolean { + return socket?.readyState === WebSocket.OPEN; +} + +function isSocketConnecting(): boolean { + return socket?.readyState === WebSocket.CONNECTING; +} + +function sendMessage(message: object): void { + if (isSocketConnected()) { + socket!.send(JSON.stringify(message)); + } +} + +function subscribeToChannels(): void { + sendMessage({ type: 'subscribe', channel: 'trades:all' }); + sendMessage({ type: 'subscribe', channel: 'trades:large' }); + sendMessage({ type: 'set_coin', coinSymbol: activeCoin }); +} + +function handleTradeMessage(message: any): void { + const trade: LiveTrade = { + ...message.data, + timestamp: Number(message.data.timestamp) + }; + + if (message.type === 'live-trade') { + liveTradesStore.update(trades => [trade, ...trades.slice(0, MAX_LIVE_TRADES - 1)]); + } else if (message.type === 'all-trades') { + allTradesStore.update(trades => [trade, ...trades.slice(0, MAX_ALL_TRADES - 1)]); + } +} + +function handleCommentMessage(message: any): void { + const callback = commentSubscriptions.get(activeCoin); + if (callback) { + callback(message); + } +} + +function handleWebSocketMessage(event: MessageEvent): void { + try { + const message = JSON.parse(event.data); + + switch (message.type) { + case 'live-trade': + case 'all-trades': + handleTradeMessage(message); + break; + + case 'ping': + sendMessage({ type: 'pong' }); + break; + + case 'new_comment': + case 'comment_liked': + handleCommentMessage(message); + break; + + default: + console.log('Unhandled message type:', message.type, message); + } + } catch (error) { + console.error('Failed to process WebSocket message:', error); + } +} + +function connect(): void { + if (!browser) return; + + // Don't connect if already connected or connecting + if (isSocketConnected() || isSocketConnecting()) { + return; + } + + clearReconnectTimer(); + + socket = new WebSocket(WEBSOCKET_URL); + + loadInitialTrades(); + + socket.onopen = () => { + console.log('WebSocket connected'); + isConnectedStore.set(true); + clearReconnectTimer(); + subscribeToChannels(); + }; + + socket.onmessage = handleWebSocketMessage; + + socket.onclose = (event) => { + console.log(`WebSocket disconnected. Code: ${event.code}`); + isConnectedStore.set(false); + socket = null; + scheduleReconnect(); + }; + + socket.onerror = (error) => { + console.error('WebSocket error:', error); + isConnectedStore.set(false); + }; +} + +function setCoin(coinSymbol: string): void { + activeCoin = coinSymbol; + sendMessage({ type: 'set_coin', coinSymbol }); +} + +function disconnect(): void { + clearReconnectTimer(); + + if (socket) { + socket.close(); + socket = null; + } + + isConnectedStore.set(false); +} + +function subscribeToComments(coinSymbol: string, callback: (message: any) => void): void { + commentSubscriptions.set(coinSymbol, callback); +} + +function unsubscribeFromComments(coinSymbol: string): void { + commentSubscriptions.delete(coinSymbol); +} + +export const websocketController = { + connect, + disconnect, + setCoin, + subscribeToComments, + unsubscribeFromComments, + loadInitialTrades +}; \ No newline at end of file diff --git a/website/src/lib/utils/validation.ts b/website/src/lib/utils/validation.ts new file mode 100644 index 0000000..8f8cb37 --- /dev/null +++ b/website/src/lib/utils/validation.ts @@ -0,0 +1,43 @@ +/** + * Validates and parses URL search parameters with proper fallbacks + */ +export function validateSearchParams(searchParams: URLSearchParams) { + return { + /** + * Gets a positive integer parameter with a default fallback + * @param key - The parameter key + * @param defaultValue - Default value if invalid or missing + * @returns Valid positive integer or default + */ + getPositiveInt: (key: string, defaultValue: number): number => { + const param = searchParams.get(key); + const parsed = param ? parseInt(param, 10) : defaultValue; + return Number.isInteger(parsed) && parsed > 0 ? parsed : defaultValue; + }, + + /** + * Gets a non-negative float parameter with a default fallback + * @param key - The parameter key + * @param defaultValue - Default value if invalid or missing + * @returns Valid non-negative float or default + */ + getNonNegativeFloat: (key: string, defaultValue: number): number => { + const param = searchParams.get(key); + const parsed = param ? parseFloat(param) : defaultValue; + return !isNaN(parsed) && parsed >= 0 ? parsed : defaultValue; + }, + + /** + * Gets a string parameter with optional validation + * @param key - The parameter key + * @param defaultValue - Default value if missing + * @param validator - Optional validation function + * @returns Valid string or default + */ + getString: (key: string, defaultValue: string, validator?: (value: string) => boolean): string => { + const param = searchParams.get(key); + if (!param) return defaultValue; + return validator ? (validator(param) ? param : defaultValue) : param; + } + }; +} diff --git a/website/src/routes/+layout.svelte b/website/src/routes/+layout.svelte index 028cd6e..acc5698 100644 --- a/website/src/routes/+layout.svelte +++ b/website/src/routes/+layout.svelte @@ -11,6 +11,7 @@ import { invalidateAll } from '$app/navigation'; import { ModeWatcher } from 'mode-watcher'; import { page } from '$app/state'; + import { websocketController } from '$lib/stores/websocket'; let { data, children } = $props<{ data: { userSession?: any }; @@ -26,6 +27,8 @@ }); onMount(() => { + websocketController.connect(); + console.log( `%c .-- .=--: @@ -69,6 +72,10 @@ window.history.replaceState({}, '', url); invalidateAll(); } + + return () => { + websocketController.disconnect(); + }; }); function getPageTitle(routeId: string | null): string { diff --git a/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts b/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts index f5e031e..d38bfc3 100644 --- a/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts +++ b/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts @@ -3,6 +3,7 @@ import { error, json } from '@sveltejs/kit'; import { db } from '$lib/server/db'; import { coin, userPortfolio, user, transaction, priceHistory } from '$lib/server/db/schema'; import { eq, and, gte } from 'drizzle-orm'; +import { redis } from '$lib/server/redis'; async function calculate24hMetrics(coinId: number, currentPrice: number) { const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000); @@ -74,7 +75,11 @@ export async function POST({ params, request }) { throw error(400, 'This coin is delisted and cannot be traded'); } - const [userData] = await db.select({ baseCurrencyBalance: user.baseCurrencyBalance }).from(user).where(eq(user.id, userId)).limit(1); + const [userData] = await db.select({ + baseCurrencyBalance: user.baseCurrencyBalance, + username: user.username, + image: user.image + }).from(user).where(eq(user.id, userId)).limit(1); if (!userData) { throw error(404, 'User not found'); @@ -177,6 +182,34 @@ export async function POST({ params, request }) { .where(eq(coin.id, coinData.id)); }); + // REDIS + const tradeData = { + type: 'BUY', + username: userData.username, + userImage: userData.image || '', + amount: coinsBought, + coinSymbol: normalizedSymbol, + coinName: coinData.name, + coinIcon: coinData.icon || '', + totalValue: totalCost, + price: newPrice, + timestamp: Date.now(), + userId: userId.toString() + }; + + await redis.publish('trades:all', JSON.stringify({ + type: 'all-trades', + data: tradeData + })); + + if (totalCost >= 1000) { + await redis.publish('trades:large', JSON.stringify({ + type: 'live-trade', + data: tradeData + })); + } + // End REDIS + return json({ success: true, type: 'BUY', @@ -282,6 +315,34 @@ export async function POST({ params, request }) { .where(eq(coin.id, coinData.id)); }); + // REDIS + const tradeData = { + type: 'SELL', + username: userData.username, + userImage: userData.image || '', + amount: amount, + coinSymbol: normalizedSymbol, + coinName: coinData.name, + coinIcon: coinData.icon || '', + totalValue: totalCost, + price: newPrice, + timestamp: Date.now(), + userId: userId.toString() + }; + + await redis.publish('trades:all', JSON.stringify({ + type: 'all-trades', + data: tradeData + })); + + if (totalCost >= 1000) { + await redis.publish('trades:large', JSON.stringify({ + type: 'live-trade', + data: tradeData + })); + } + // End REDIS + return json({ success: true, type: 'SELL', diff --git a/website/src/routes/api/trades/recent/+server.ts b/website/src/routes/api/trades/recent/+server.ts new file mode 100644 index 0000000..d7c07b6 --- /dev/null +++ b/website/src/routes/api/trades/recent/+server.ts @@ -0,0 +1,57 @@ +import { json } from '@sveltejs/kit'; +import { db } from '$lib/server/db'; +import { transaction, user, coin } from '$lib/server/db/schema'; +import { desc, gte, eq } from 'drizzle-orm'; +import { validateSearchParams } from '$lib/utils/validation'; + +export async function GET({ url }) { + const params = validateSearchParams(url.searchParams); + const limit = params.getPositiveInt('limit', 100); + const minValue = params.getNonNegativeFloat('minValue', 0); + + try { + const trades = await db + .select({ + type: transaction.type, + username: user.username, + userImage: user.image, + amount: transaction.quantity, + coinSymbol: coin.symbol, + coinName: coin.name, + coinIcon: coin.icon, + totalValue: transaction.totalBaseCurrencyAmount, + price: transaction.pricePerCoin, + timestamp: transaction.timestamp, + userId: transaction.userId + }) + .from(transaction) + .innerJoin(user, eq(user.id, transaction.userId)) + .innerJoin(coin, eq(coin.id, transaction.coinId)) + .where( + minValue > 0 + ? gte(transaction.totalBaseCurrencyAmount, minValue.toString()) + : undefined + ) + .orderBy(desc(transaction.timestamp)) + .limit(limit); + + const formattedTrades = trades.map(trade => ({ + type: trade.type as 'BUY' | 'SELL', + username: trade.username, + userImage: trade.userImage, + amount: Number(trade.amount), + coinSymbol: trade.coinSymbol, + coinName: trade.coinName, + coinIcon: trade.coinIcon, + totalValue: Number(trade.totalValue), + price: Number(trade.price), + timestamp: trade.timestamp.getTime(), + userId: trade.userId.toString() + })); + + return json({ trades: formattedTrades }); + } catch (error) { + console.error('Error fetching recent trades:', error); + return json({ trades: [] }); + } +} diff --git a/website/src/routes/coin/[coinSymbol]/+page.svelte b/website/src/routes/coin/[coinSymbol]/+page.svelte index 939aa40..87e6912 100644 --- a/website/src/routes/coin/[coinSymbol]/+page.svelte +++ b/website/src/routes/coin/[coinSymbol]/+page.svelte @@ -22,6 +22,7 @@ import { USER_DATA } from '$lib/stores/user-data'; import { fetchPortfolioData } from '$lib/stores/portfolio-data'; import { getPublicUrl } from '$lib/utils.js'; + import { websocketController } from '$lib/stores/websocket'; const { data } = $props(); const coinSymbol = data.coinSymbol; @@ -38,6 +39,8 @@ onMount(async () => { await loadCoinData(); await loadUserHolding(); + + websocketController.setCoin(coinSymbol.toUpperCase()); }); async function loadCoinData() { diff --git a/website/src/routes/live/+page.svelte b/website/src/routes/live/+page.svelte new file mode 100644 index 0000000..e77cfd2 --- /dev/null +++ b/website/src/routes/live/+page.svelte @@ -0,0 +1,167 @@ + + + + Live Trades - Rugplay + + + +
+
+
+

Live Trades

+

Real-time trading activity for all trades

+
+
+ + + + + + Stream + {#if $allTradesStore.length > 0} + + {$allTradesStore.length} trade{$allTradesStore.length !== 1 ? 's' : ''} + + {/if} + + + + {#if $isLoadingTrades} +
+ {#each Array(8) as _, i} +
+
+
+ + +
+ +
+
+ + + + + +
+ +
+
+ +
+ + +
+
+ {/each} +
+ {:else if $allTradesStore.length === 0} +
+ +

Waiting for trades...

+

All trades will appear here in real-time.

+
+ {:else} +
+ {#each $allTradesStore as trade (trade.timestamp)} +
+
+
+ {#if trade.type === 'BUY'} +
+ +
+ BUY + {:else} +
+ +
+ SELL + {/if} +
+ +
+
+ + + {trade.type === 'BUY' ? 'bought by' : 'sold by'} + + + handleUserClick(trade.username)} + > +
+ + + {trade.username.charAt(0).toUpperCase()} + + @{trade.username} +
+
+ + + +
+
+
+ Trade value: {formatValue(trade.totalValue)} +
+
+
+ +
+ + {formatRelativeTime(new Date(trade.timestamp))} +
+
+ {/each} +
+ {/if} +
+
+
diff --git a/website/websocket/src/main.ts b/website/websocket/src/main.ts index d22c9f8..fd6a58f 100644 --- a/website/websocket/src/main.ts +++ b/website/websocket/src/main.ts @@ -15,23 +15,73 @@ if (!process.env.REDIS_URL) { const redis = new Redis(process.env.REDIS_URL); redis.on('error', (err) => console.error('Redis Client Error', err)); -redis.on('connect', () => console.log('Connected to Redis')); -redis.psubscribe('comments:*'); +redis.on('connect', () => { + redis.psubscribe('comments:*', 'prices:*', (err, count) => { + if (err) console.error("Failed to psubscribe to patterns", err); + else console.log(`Successfully psubscribed to patterns. Active psubscriptions: ${count}`); + }); -redis.on('pmessage', (_pattern, channel, msg) => { + redis.subscribe('trades:all', 'trades:large', (err, count) => { + if (err) console.error("Failed to subscribe to 'trades:all' and 'trades:large'", err); + else console.log(`Successfully subscribed to 'trades:all', 'trades:large'. Active subscriptions: ${count}`); + }); +}); + +redis.on('pmessage', (pattern, channel, msg) => { + console.log(`[Redis pmessage RECEIVED] Pattern: "${pattern}", Channel: "${channel}", Message: "${msg}"`); try { - const coinSymbol = channel.substring('comments:'.length); - const sockets = coinSockets.get(coinSymbol); - if (sockets) { - for (const ws of sockets) { - if (ws.readyState === WebSocket.OPEN) { - ws.send(msg); + if (channel.startsWith('comments:')) { + const coinSymbol = channel.substring('comments:'.length); + const sockets = coinSockets.get(coinSymbol); + if (sockets) { + for (const ws of sockets) { + if (ws.readyState === WebSocket.OPEN) { + ws.send(msg); + } + } + } + } else if (channel.startsWith('prices:')) { + const coinSymbol = channel.substring('prices:'.length); + const sockets = coinSockets.get(coinSymbol); + console.log(`Received price update for ${coinSymbol}:`, msg); + if (sockets) { + const priceData = JSON.parse(msg); + const priceMessage = JSON.stringify({ + type: 'price_update', + coinSymbol, + ...priceData + }); + + for (const ws of sockets) { + if (ws.readyState === WebSocket.OPEN) { + ws.send(priceMessage); + } } } } } catch (error) { - console.error('Error processing Redis message:', error); + console.error('Error processing Redis pmessage:', error, `Pattern: ${pattern}, Channel: ${channel}, Raw message: ${msg}`); + } +}); + +redis.on('message', (channel, msg) => { + try { + if (channel === 'trades:all' || channel === 'trades:large') { + const tradeData = JSON.parse(msg); + + const tradeMessage = JSON.stringify(tradeData); + + for (const [, sockets] of coinSockets.entries()) { + for (const ws of sockets) { + if (ws.readyState === WebSocket.OPEN) { + ws.send(tradeMessage); + } + } + } + } + } catch (error) { + console.error('Error processing Redis message:', error, `Channel: ${channel}, Raw message: ${msg}`); } }); @@ -117,6 +167,8 @@ const server = Bun.serve({ if (data.type === 'set_coin' && data.coinSymbol) { handleSetCoin(ws, data.coinSymbol); + } else if (data.type === 'pong') { + ws.data.lastActivity = Date.now(); } } catch (error) { console.error('Message parsing error:', error);