-
-
Cash:
-
${formatCurrency($PORTFOLIO_DATA.baseCurrencyBalance)}
+
-
-
Coins:
-
${formatCurrency($PORTFOLIO_DATA.totalCoinValue)}
+ {:else}
+
+
+
+ Total Value
+
+
+ ${formatCurrency($PORTFOLIO_DATA.totalValue)}
+
-
+
+
+ Cash:
+ ${formatCurrency($PORTFOLIO_DATA.baseCurrencyBalance)}
+
+
+ Coins:
+ ${formatCurrency($PORTFOLIO_DATA.totalCoinValue)}
+
+
+ {/if}
diff --git a/website/src/lib/components/self/CommentSection.svelte b/website/src/lib/components/self/CommentSection.svelte
index d2c8327..f8272bf 100644
--- a/website/src/lib/components/self/CommentSection.svelte
+++ b/website/src/lib/components/self/CommentSection.svelte
@@ -12,7 +12,7 @@
import { formatTimeAgo, getPublicUrl } from '$lib/utils';
import SignInConfirmDialog from '$lib/components/self/SignInConfirmDialog.svelte';
import UserProfilePreview from '$lib/components/self/UserProfilePreview.svelte';
- import WebSocket, { type WebSocketHandle } from '$lib/components/self/WebSocket.svelte';
+ import { websocketController } from '$lib/stores/websocket';
const { coinSymbol } = $props<{ coinSymbol: string }>();
import type { Comment } from '$lib/types/comment';
@@ -21,7 +21,15 @@
let isSubmitting = $state(false);
let isLoading = $state(true);
let shouldSignIn = $state(false);
- let wsManager = $state
();
+
+ $effect(() => {
+ websocketController.setCoin(coinSymbol);
+ websocketController.subscribeToComments(coinSymbol, handleWebSocketMessage);
+
+ return () => {
+ websocketController.unsubscribeFromComments(coinSymbol);
+ };
+ });
function handleWebSocketMessage(message: { type: string; data?: any }) {
switch (message.type) {
@@ -48,13 +56,6 @@
}
}
- function handleWebSocketOpen() {
- wsManager?.send({
- type: 'set_coin',
- coinSymbol
- });
- }
-
async function loadComments() {
try {
const response = await fetch(`/api/coin/${coinSymbol}/comments`);
@@ -151,13 +152,6 @@
-
-
diff --git a/website/src/lib/components/self/DailyRewards.svelte b/website/src/lib/components/self/DailyRewards.svelte
index c5069b0..a186d94 100644
--- a/website/src/lib/components/self/DailyRewards.svelte
+++ b/website/src/lib/components/self/DailyRewards.svelte
@@ -141,26 +141,24 @@
}
-{#if $USER_DATA && rewardStatus}
-
-{/if}
+
diff --git a/website/src/lib/components/self/WebSocket.svelte b/website/src/lib/components/self/WebSocket.svelte
deleted file mode 100644
index 2de7c66..0000000
--- a/website/src/lib/components/self/WebSocket.svelte
+++ /dev/null
@@ -1,102 +0,0 @@
-
diff --git a/website/src/lib/stores/websocket.ts b/website/src/lib/stores/websocket.ts
new file mode 100644
index 0000000..4d220dc
--- /dev/null
+++ b/website/src/lib/stores/websocket.ts
@@ -0,0 +1,212 @@
+import { writable } from 'svelte/store';
+import { browser } from '$app/environment';
+import { PUBLIC_WEBSOCKET_URL } from '$env/static/public';
+
+export interface LiveTrade {
+ type: 'BUY' | 'SELL';
+ username: string;
+ amount: number;
+ coinSymbol: string;
+ coinName?: string;
+ coinIcon?: string;
+ totalValue: number;
+ price: number;
+ timestamp: number;
+ userId: string;
+ userImage?: string;
+}
+
+// Constants
+const WEBSOCKET_URL = PUBLIC_WEBSOCKET_URL;
+const RECONNECT_DELAY = 5000;
+const MAX_LIVE_TRADES = 5;
+const MAX_ALL_TRADES = 100;
+
+// WebSocket state
+let socket: WebSocket | null = null;
+let reconnectTimer: NodeJS.Timeout | null = null;
+let activeCoin: string = '@global';
+
+// Stores
+export const liveTradesStore = writable([]);
+export const allTradesStore = writable([]);
+export const isConnectedStore = writable(false);
+export const isLoadingTrades = writable(false);
+
+// Comment callbacks
+const commentSubscriptions = new Map void>();
+
+async function loadInitialTrades(): Promise {
+ if (!browser) return;
+
+ isLoadingTrades.set(true);
+
+ try {
+ const [largeTradesResponse, allTradesResponse] = await Promise.all([
+ fetch('/api/trades/recent?limit=5&minValue=1000'),
+ fetch('/api/trades/recent?limit=100')
+ ]);
+
+ if (largeTradesResponse.ok) {
+ const { trades } = await largeTradesResponse.json();
+ liveTradesStore.set(trades);
+ }
+
+ if (allTradesResponse.ok) {
+ const { trades } = await allTradesResponse.json();
+ allTradesStore.set(trades);
+ }
+ } catch (error) {
+ console.error('Failed to load initial trades:', error);
+ } finally {
+ isLoadingTrades.set(false);
+ }
+}
+
+function clearReconnectTimer(): void {
+ if (reconnectTimer) {
+ clearTimeout(reconnectTimer);
+ reconnectTimer = null;
+ }
+}
+
+function scheduleReconnect(): void {
+ clearReconnectTimer();
+ reconnectTimer = setTimeout(connect, RECONNECT_DELAY);
+}
+
+function isSocketConnected(): boolean {
+ return socket?.readyState === WebSocket.OPEN;
+}
+
+function isSocketConnecting(): boolean {
+ return socket?.readyState === WebSocket.CONNECTING;
+}
+
+function sendMessage(message: object): void {
+ if (isSocketConnected()) {
+ socket!.send(JSON.stringify(message));
+ }
+}
+
+function subscribeToChannels(): void {
+ sendMessage({ type: 'subscribe', channel: 'trades:all' });
+ sendMessage({ type: 'subscribe', channel: 'trades:large' });
+ sendMessage({ type: 'set_coin', coinSymbol: activeCoin });
+}
+
+function handleTradeMessage(message: any): void {
+ const trade: LiveTrade = {
+ ...message.data,
+ timestamp: Number(message.data.timestamp)
+ };
+
+ if (message.type === 'live-trade') {
+ liveTradesStore.update(trades => [trade, ...trades.slice(0, MAX_LIVE_TRADES - 1)]);
+ } else if (message.type === 'all-trades') {
+ allTradesStore.update(trades => [trade, ...trades.slice(0, MAX_ALL_TRADES - 1)]);
+ }
+}
+
+function handleCommentMessage(message: any): void {
+ const callback = commentSubscriptions.get(activeCoin);
+ if (callback) {
+ callback(message);
+ }
+}
+
+function handleWebSocketMessage(event: MessageEvent): void {
+ try {
+ const message = JSON.parse(event.data);
+
+ switch (message.type) {
+ case 'live-trade':
+ case 'all-trades':
+ handleTradeMessage(message);
+ break;
+
+ case 'ping':
+ sendMessage({ type: 'pong' });
+ break;
+
+ case 'new_comment':
+ case 'comment_liked':
+ handleCommentMessage(message);
+ break;
+
+ default:
+ console.log('Unhandled message type:', message.type, message);
+ }
+ } catch (error) {
+ console.error('Failed to process WebSocket message:', error);
+ }
+}
+
+function connect(): void {
+ if (!browser) return;
+
+ // Don't connect if already connected or connecting
+ if (isSocketConnected() || isSocketConnecting()) {
+ return;
+ }
+
+ clearReconnectTimer();
+
+ socket = new WebSocket(WEBSOCKET_URL);
+
+ loadInitialTrades();
+
+ socket.onopen = () => {
+ console.log('WebSocket connected');
+ isConnectedStore.set(true);
+ clearReconnectTimer();
+ subscribeToChannels();
+ };
+
+ socket.onmessage = handleWebSocketMessage;
+
+ socket.onclose = (event) => {
+ console.log(`WebSocket disconnected. Code: ${event.code}`);
+ isConnectedStore.set(false);
+ socket = null;
+ scheduleReconnect();
+ };
+
+ socket.onerror = (error) => {
+ console.error('WebSocket error:', error);
+ isConnectedStore.set(false);
+ };
+}
+
+function setCoin(coinSymbol: string): void {
+ activeCoin = coinSymbol;
+ sendMessage({ type: 'set_coin', coinSymbol });
+}
+
+function disconnect(): void {
+ clearReconnectTimer();
+
+ if (socket) {
+ socket.close();
+ socket = null;
+ }
+
+ isConnectedStore.set(false);
+}
+
+function subscribeToComments(coinSymbol: string, callback: (message: any) => void): void {
+ commentSubscriptions.set(coinSymbol, callback);
+}
+
+function unsubscribeFromComments(coinSymbol: string): void {
+ commentSubscriptions.delete(coinSymbol);
+}
+
+export const websocketController = {
+ connect,
+ disconnect,
+ setCoin,
+ subscribeToComments,
+ unsubscribeFromComments,
+ loadInitialTrades
+};
\ No newline at end of file
diff --git a/website/src/lib/utils/validation.ts b/website/src/lib/utils/validation.ts
new file mode 100644
index 0000000..8f8cb37
--- /dev/null
+++ b/website/src/lib/utils/validation.ts
@@ -0,0 +1,43 @@
+/**
+ * Validates and parses URL search parameters with proper fallbacks
+ */
+export function validateSearchParams(searchParams: URLSearchParams) {
+ return {
+ /**
+ * Gets a positive integer parameter with a default fallback
+ * @param key - The parameter key
+ * @param defaultValue - Default value if invalid or missing
+ * @returns Valid positive integer or default
+ */
+ getPositiveInt: (key: string, defaultValue: number): number => {
+ const param = searchParams.get(key);
+ const parsed = param ? parseInt(param, 10) : defaultValue;
+ return Number.isInteger(parsed) && parsed > 0 ? parsed : defaultValue;
+ },
+
+ /**
+ * Gets a non-negative float parameter with a default fallback
+ * @param key - The parameter key
+ * @param defaultValue - Default value if invalid or missing
+ * @returns Valid non-negative float or default
+ */
+ getNonNegativeFloat: (key: string, defaultValue: number): number => {
+ const param = searchParams.get(key);
+ const parsed = param ? parseFloat(param) : defaultValue;
+ return !isNaN(parsed) && parsed >= 0 ? parsed : defaultValue;
+ },
+
+ /**
+ * Gets a string parameter with optional validation
+ * @param key - The parameter key
+ * @param defaultValue - Default value if missing
+ * @param validator - Optional validation function
+ * @returns Valid string or default
+ */
+ getString: (key: string, defaultValue: string, validator?: (value: string) => boolean): string => {
+ const param = searchParams.get(key);
+ if (!param) return defaultValue;
+ return validator ? (validator(param) ? param : defaultValue) : param;
+ }
+ };
+}
diff --git a/website/src/routes/+layout.svelte b/website/src/routes/+layout.svelte
index 028cd6e..acc5698 100644
--- a/website/src/routes/+layout.svelte
+++ b/website/src/routes/+layout.svelte
@@ -11,6 +11,7 @@
import { invalidateAll } from '$app/navigation';
import { ModeWatcher } from 'mode-watcher';
import { page } from '$app/state';
+ import { websocketController } from '$lib/stores/websocket';
let { data, children } = $props<{
data: { userSession?: any };
@@ -26,6 +27,8 @@
});
onMount(() => {
+ websocketController.connect();
+
console.log(
`%c .--
.=--:
@@ -69,6 +72,10 @@
window.history.replaceState({}, '', url);
invalidateAll();
}
+
+ return () => {
+ websocketController.disconnect();
+ };
});
function getPageTitle(routeId: string | null): string {
diff --git a/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts b/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts
index f5e031e..d38bfc3 100644
--- a/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts
+++ b/website/src/routes/api/coin/[coinSymbol]/trade/+server.ts
@@ -3,6 +3,7 @@ import { error, json } from '@sveltejs/kit';
import { db } from '$lib/server/db';
import { coin, userPortfolio, user, transaction, priceHistory } from '$lib/server/db/schema';
import { eq, and, gte } from 'drizzle-orm';
+import { redis } from '$lib/server/redis';
async function calculate24hMetrics(coinId: number, currentPrice: number) {
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
@@ -74,7 +75,11 @@ export async function POST({ params, request }) {
throw error(400, 'This coin is delisted and cannot be traded');
}
- const [userData] = await db.select({ baseCurrencyBalance: user.baseCurrencyBalance }).from(user).where(eq(user.id, userId)).limit(1);
+ const [userData] = await db.select({
+ baseCurrencyBalance: user.baseCurrencyBalance,
+ username: user.username,
+ image: user.image
+ }).from(user).where(eq(user.id, userId)).limit(1);
if (!userData) {
throw error(404, 'User not found');
@@ -177,6 +182,34 @@ export async function POST({ params, request }) {
.where(eq(coin.id, coinData.id));
});
+ // REDIS
+ const tradeData = {
+ type: 'BUY',
+ username: userData.username,
+ userImage: userData.image || '',
+ amount: coinsBought,
+ coinSymbol: normalizedSymbol,
+ coinName: coinData.name,
+ coinIcon: coinData.icon || '',
+ totalValue: totalCost,
+ price: newPrice,
+ timestamp: Date.now(),
+ userId: userId.toString()
+ };
+
+ await redis.publish('trades:all', JSON.stringify({
+ type: 'all-trades',
+ data: tradeData
+ }));
+
+ if (totalCost >= 1000) {
+ await redis.publish('trades:large', JSON.stringify({
+ type: 'live-trade',
+ data: tradeData
+ }));
+ }
+ // End REDIS
+
return json({
success: true,
type: 'BUY',
@@ -282,6 +315,34 @@ export async function POST({ params, request }) {
.where(eq(coin.id, coinData.id));
});
+ // REDIS
+ const tradeData = {
+ type: 'SELL',
+ username: userData.username,
+ userImage: userData.image || '',
+ amount: amount,
+ coinSymbol: normalizedSymbol,
+ coinName: coinData.name,
+ coinIcon: coinData.icon || '',
+ totalValue: totalCost,
+ price: newPrice,
+ timestamp: Date.now(),
+ userId: userId.toString()
+ };
+
+ await redis.publish('trades:all', JSON.stringify({
+ type: 'all-trades',
+ data: tradeData
+ }));
+
+ if (totalCost >= 1000) {
+ await redis.publish('trades:large', JSON.stringify({
+ type: 'live-trade',
+ data: tradeData
+ }));
+ }
+ // End REDIS
+
return json({
success: true,
type: 'SELL',
diff --git a/website/src/routes/api/trades/recent/+server.ts b/website/src/routes/api/trades/recent/+server.ts
new file mode 100644
index 0000000..d7c07b6
--- /dev/null
+++ b/website/src/routes/api/trades/recent/+server.ts
@@ -0,0 +1,57 @@
+import { json } from '@sveltejs/kit';
+import { db } from '$lib/server/db';
+import { transaction, user, coin } from '$lib/server/db/schema';
+import { desc, gte, eq } from 'drizzle-orm';
+import { validateSearchParams } from '$lib/utils/validation';
+
+export async function GET({ url }) {
+ const params = validateSearchParams(url.searchParams);
+ const limit = params.getPositiveInt('limit', 100);
+ const minValue = params.getNonNegativeFloat('minValue', 0);
+
+ try {
+ const trades = await db
+ .select({
+ type: transaction.type,
+ username: user.username,
+ userImage: user.image,
+ amount: transaction.quantity,
+ coinSymbol: coin.symbol,
+ coinName: coin.name,
+ coinIcon: coin.icon,
+ totalValue: transaction.totalBaseCurrencyAmount,
+ price: transaction.pricePerCoin,
+ timestamp: transaction.timestamp,
+ userId: transaction.userId
+ })
+ .from(transaction)
+ .innerJoin(user, eq(user.id, transaction.userId))
+ .innerJoin(coin, eq(coin.id, transaction.coinId))
+ .where(
+ minValue > 0
+ ? gte(transaction.totalBaseCurrencyAmount, minValue.toString())
+ : undefined
+ )
+ .orderBy(desc(transaction.timestamp))
+ .limit(limit);
+
+ const formattedTrades = trades.map(trade => ({
+ type: trade.type as 'BUY' | 'SELL',
+ username: trade.username,
+ userImage: trade.userImage,
+ amount: Number(trade.amount),
+ coinSymbol: trade.coinSymbol,
+ coinName: trade.coinName,
+ coinIcon: trade.coinIcon,
+ totalValue: Number(trade.totalValue),
+ price: Number(trade.price),
+ timestamp: trade.timestamp.getTime(),
+ userId: trade.userId.toString()
+ }));
+
+ return json({ trades: formattedTrades });
+ } catch (error) {
+ console.error('Error fetching recent trades:', error);
+ return json({ trades: [] });
+ }
+}
diff --git a/website/src/routes/coin/[coinSymbol]/+page.svelte b/website/src/routes/coin/[coinSymbol]/+page.svelte
index 939aa40..87e6912 100644
--- a/website/src/routes/coin/[coinSymbol]/+page.svelte
+++ b/website/src/routes/coin/[coinSymbol]/+page.svelte
@@ -22,6 +22,7 @@
import { USER_DATA } from '$lib/stores/user-data';
import { fetchPortfolioData } from '$lib/stores/portfolio-data';
import { getPublicUrl } from '$lib/utils.js';
+ import { websocketController } from '$lib/stores/websocket';
const { data } = $props();
const coinSymbol = data.coinSymbol;
@@ -38,6 +39,8 @@
onMount(async () => {
await loadCoinData();
await loadUserHolding();
+
+ websocketController.setCoin(coinSymbol.toUpperCase());
});
async function loadCoinData() {
diff --git a/website/src/routes/live/+page.svelte b/website/src/routes/live/+page.svelte
new file mode 100644
index 0000000..e77cfd2
--- /dev/null
+++ b/website/src/routes/live/+page.svelte
@@ -0,0 +1,167 @@
+
+
+
+ Live Trades - Rugplay
+
+
+
+
+
+
+
+
+
+
+ Stream
+ {#if $allTradesStore.length > 0}
+
+ {$allTradesStore.length} trade{$allTradesStore.length !== 1 ? 's' : ''}
+
+ {/if}
+
+
+
+ {#if $isLoadingTrades}
+
+ {#each Array(8) as _, i}
+
+ {/each}
+
+ {:else if $allTradesStore.length === 0}
+
+
+
Waiting for trades...
+
All trades will appear here in real-time.
+
+ {:else}
+
+ {#each $allTradesStore as trade (trade.timestamp)}
+
+
+
+ {#if trade.type === 'BUY'}
+
+
+
+
BUY
+ {:else}
+
+
+
+
SELL
+ {/if}
+
+
+
+
+
+
+ {trade.type === 'BUY' ? 'bought by' : 'sold by'}
+
+
+ handleUserClick(trade.username)}
+ >
+
+
+
+ {trade.username.charAt(0).toUpperCase()}
+
+
@{trade.username}
+
+
+
+
+
+
+
+
+ Trade value: {formatValue(trade.totalValue)}
+
+
+
+
+
+
+ {formatRelativeTime(new Date(trade.timestamp))}
+
+
+ {/each}
+
+ {/if}
+
+
+
diff --git a/website/websocket/src/main.ts b/website/websocket/src/main.ts
index d22c9f8..fd6a58f 100644
--- a/website/websocket/src/main.ts
+++ b/website/websocket/src/main.ts
@@ -15,23 +15,73 @@ if (!process.env.REDIS_URL) {
const redis = new Redis(process.env.REDIS_URL);
redis.on('error', (err) => console.error('Redis Client Error', err));
-redis.on('connect', () => console.log('Connected to Redis'));
-redis.psubscribe('comments:*');
+redis.on('connect', () => {
+ redis.psubscribe('comments:*', 'prices:*', (err, count) => {
+ if (err) console.error("Failed to psubscribe to patterns", err);
+ else console.log(`Successfully psubscribed to patterns. Active psubscriptions: ${count}`);
+ });
-redis.on('pmessage', (_pattern, channel, msg) => {
+ redis.subscribe('trades:all', 'trades:large', (err, count) => {
+ if (err) console.error("Failed to subscribe to 'trades:all' and 'trades:large'", err);
+ else console.log(`Successfully subscribed to 'trades:all', 'trades:large'. Active subscriptions: ${count}`);
+ });
+});
+
+redis.on('pmessage', (pattern, channel, msg) => {
+ console.log(`[Redis pmessage RECEIVED] Pattern: "${pattern}", Channel: "${channel}", Message: "${msg}"`);
try {
- const coinSymbol = channel.substring('comments:'.length);
- const sockets = coinSockets.get(coinSymbol);
- if (sockets) {
- for (const ws of sockets) {
- if (ws.readyState === WebSocket.OPEN) {
- ws.send(msg);
+ if (channel.startsWith('comments:')) {
+ const coinSymbol = channel.substring('comments:'.length);
+ const sockets = coinSockets.get(coinSymbol);
+ if (sockets) {
+ for (const ws of sockets) {
+ if (ws.readyState === WebSocket.OPEN) {
+ ws.send(msg);
+ }
+ }
+ }
+ } else if (channel.startsWith('prices:')) {
+ const coinSymbol = channel.substring('prices:'.length);
+ const sockets = coinSockets.get(coinSymbol);
+ console.log(`Received price update for ${coinSymbol}:`, msg);
+ if (sockets) {
+ const priceData = JSON.parse(msg);
+ const priceMessage = JSON.stringify({
+ type: 'price_update',
+ coinSymbol,
+ ...priceData
+ });
+
+ for (const ws of sockets) {
+ if (ws.readyState === WebSocket.OPEN) {
+ ws.send(priceMessage);
+ }
}
}
}
} catch (error) {
- console.error('Error processing Redis message:', error);
+ console.error('Error processing Redis pmessage:', error, `Pattern: ${pattern}, Channel: ${channel}, Raw message: ${msg}`);
+ }
+});
+
+redis.on('message', (channel, msg) => {
+ try {
+ if (channel === 'trades:all' || channel === 'trades:large') {
+ const tradeData = JSON.parse(msg);
+
+ const tradeMessage = JSON.stringify(tradeData);
+
+ for (const [, sockets] of coinSockets.entries()) {
+ for (const ws of sockets) {
+ if (ws.readyState === WebSocket.OPEN) {
+ ws.send(tradeMessage);
+ }
+ }
+ }
+ }
+ } catch (error) {
+ console.error('Error processing Redis message:', error, `Channel: ${channel}, Raw message: ${msg}`);
}
});
@@ -117,6 +167,8 @@ const server = Bun.serve({
if (data.type === 'set_coin' && data.coinSymbol) {
handleSetCoin(ws, data.coinSymbol);
+ } else if (data.type === 'pong') {
+ ws.data.lastActivity = Date.now();
}
} catch (error) {
console.error('Message parsing error:', error);