2025-08-16
JavaScript
0
请注意,本文编写于 169 天前,最后修改于 161 天前,其中某些信息可能已经过时。

目录

效果展示
代码
pubsub.js
功能展示html
style.css
app.js
ts升级版(ack支持异步)

实现一个简易单不简单的发布订阅的插件,支持先发布再订阅的模式,支持单播和多播模式,支持历史回放和消息消费ack回掉函数

效果展示

1755327732917.gif

代码

pubsub.js

js
class PubSub { constructor(options = {}) { // 默认全局配置 const defaultConfig = { maxHistory: 5, messageTimeout: 15000, broadcastMode: 'multicast', maxConsumption: null, historyEnabled: true, publisherName: '系统发布者' }; // 合并用户传入的全局配置 this.globalConfig = { ...defaultConfig, ...options }; this.topics = new Map(); this.ackCallbacks = new Map(); this.subscribers = new Map(); this.messageCounter = 0; } /** * 更新全局配置 * @param {Object} newConfig 新的全局配置 */ updateGlobalConfig (newConfig) { this.globalConfig = { ...this.globalConfig, ...newConfig }; } /** * 订阅主题 * @param {string} topic 主题名称 * @param {Function} callback 回调函数 * @param {Object} options 选项 */ subscribe (topic, callback, options = {}) { if (!this.topics.has(topic)) { this.topics.set(topic, { subscribers: new Map(), history: [], config: { ...this.globalConfig } }); } const topicInfo = this.topics.get(topic); const subscriberId = options.subscriberId || `sub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; // 保存订阅者 topicInfo.subscribers.set(subscriberId, { callback, options: { replayHistory: this.globalConfig.historyEnabled, ...options } }); // 存储全局订阅者引用 this.subscribers.set(subscriberId, { topic, callback }); // 回放历史消息 if (topicInfo.subscribers.size === 1 && topicInfo.history.length > 0) { this._replayHistory(topic, subscriberId); } return subscriberId; } /** * 发布消息 * @param {string} topic 主题名称 * @param {*} message 消息内容 * @param {Object} localOptions 局部选项(可选) */ publish (topic, message, localOptions = {}, ackCallback) { if (!this.topics.has(topic)) { this.topics.set(topic, { subscribers: new Map(), history: [], config: { ...this.globalConfig } }); } const topicInfo = this.topics.get(topic) const messageId = `msg-${this.messageCounter++}`; const timestamp = Date.now(); // 合并配置:全局配置 + 发布时局部配置 const mergedOptions = { ...this.globalConfig, ...localOptions, publisherName: localOptions.publisherName || this.globalConfig.publisherName }; const messageData = { id: messageId, topic, message, timestamp, publisher: mergedOptions.publisherName, options: mergedOptions, consumedCount: 0, consumers: [], consumed: false,// 是否已消费 }; // 保存ACK回调 // const ackCallback = (ackInfo) => { // if (this.ackCallbacks.has(messageId)) { // this.ackCallbacks.get(messageId)(ackInfo); // } // }; // 存储ACK回调 if (typeof ackCallback === 'function') { this.ackCallbacks.set(messageId, ackCallback); } // 如果有订阅者,立即投递 if (topicInfo.subscribers.size > 0) { this._deliverMessage(topic, messageData); } else { // 存储为历史消息 this._addToHistory(topicInfo, messageData); } return messageId; } /** * 投递消息给订阅者 * @param {string} topic 主题名称 * @param {Object} messageData 消息数据 */ _deliverMessage (topic, messageData) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; const { subscribers } = topicInfo; const { broadcastMode, maxConsumption } = messageData.options; if (broadcastMode === 'single') { const firstSubscriber = subscribers.entries().next().value; if (firstSubscriber) { const [subscriberId, subscriber] = firstSubscriber; this._deliverToSubscriber(subscriberId, subscriber, messageData); } return; } const remainingConsumption = maxConsumption === null ? Infinity : maxConsumption - messageData.consumedCount; let count = 0; for (const [subscriberId, subscriber] of subscribers) { if (count >= remainingConsumption) break; if (messageData.consumers.includes(subscriberId)) continue; this._deliverToSubscriber(subscriberId, subscriber, messageData); messageData.consumers.push(subscriberId); count++; messageData.consumedCount++; if (messageData.consumedCount >= remainingConsumption) break; } } /** * 投递给指定订阅者 * @param {string} subscriberId 订阅者ID * @param {Object} subscriber 订阅者 * @param {Object} messageData 消息数据 */ _deliverToSubscriber (subscriberId, subscriber, messageData) { const ack = (result) => { const ackInfo = { subscriberId, messageId: messageData.id, topic: messageData.topic, message: messageData.message, publishTime: messageData.timestamp, consumeTime: Date.now(), result }; if (this.ackCallbacks.has(messageData.id)) { this.ackCallbacks.get(messageData.id)(ackInfo); } }; try { subscriber.callback(messageData.message, ack, messageData); } catch (error) { ack({ error: error.message }); } } /** * 添加到历史记录 * @param {Object} topicInfo 主题信息 * @param {Object} messageData 消息数据 */ _addToHistory (topicInfo, messageData) { const { history } = topicInfo; const maxHistory = messageData.options.maxHistory || 5; if (messageData.options.messageTimeout > 0) { messageData.timeoutId = setTimeout(() => { if (this.ackCallbacks.has(messageData.id)) { const ackInfo = { status: 'timeout', messageId: messageData.id, message: messageData.message, topic: messageData.topic, publishTime: messageData.timestamp, timeoutTime: Date.now() }; this.ackCallbacks.get(messageData.id)(ackInfo); const index = history.findIndex(msg => msg.id === messageData.id); if (index !== -1) { history.splice(index, 1); } this.ackCallbacks.delete(messageData.id); } }, messageData.options.messageTimeout); } if (history.length >= maxHistory) { const removed = history.shift(); if (removed?.timeoutId) clearTimeout(removed.timeoutId); } history.push(messageData); } /** * 回放历史消息给订阅者 * @param {string} topic 主题名称 * @param {string} subscriberId 订阅者ID */ _replayHistory (topic, subscriberId) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; const subscriber = topicInfo.subscribers.get(subscriberId); if (!subscriber) return; topicInfo.history.forEach(messageData => { const ack = (result) => { const ackInfo = { subscriberId, messageId: messageData.id, message: messageData.message, topic, publishTime: messageData.timestamp, consumeTime: Date.now(), result, isHistory: true }; if (this.ackCallbacks.has(messageData.id)) { this.ackCallbacks.get(messageData.id)(ackInfo); this.ackCallbacks.delete(messageData.id); } }; try { subscriber.callback(messageData.message, ack, { ...messageData, isHistory: true }); } catch (error) { ack({ error: error.message }); } }); } /** * 取消订阅 * @param {string} topic 主题名称 * @param {string} subscriberId 订阅者ID */ unsubscribe (topic, subscriberId) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; if (topicInfo.subscribers.has(subscriberId)) { topicInfo.subscribers.delete(subscriberId); this.subscribers.delete(subscriberId); } } /** * 清除主题历史记录 * @param {string} topic 主题名称 */ clearHistory (topic) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; topicInfo.history.forEach(msg => { if (msg.timeoutId) clearTimeout(msg.timeoutId); }); topicInfo.history = []; } /** * 获取主题信息 * @param {string} topic 主题名称 */ getTopicInfo (topic) { if (!this.topics.has(topic)) return null; const topicInfo = this.topics.get(topic); return { subscribers: topicInfo.subscribers.size, pendingMessages: topicInfo.history.length }; } /** * 重置系统 */ reset () { this.topics.clear(); this.ackCallbacks.clear(); this.subscribers.clear(); this.messageCounter = 0; } }

功能展示html

<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>PubSub消息系统</title> <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"> <link rel="stylesheet" href="style.css"> </head> <body> <div class="container"> <header> <h1><i class="fas fa-exchange-alt"></i> 高级PubSub消息系统</h1> <p class="subtitle">支持全局配置、配置合并、消息广播、ACK确认和历史回放的完整消息系统</p> <div class="feature-badges"> <div class="badge"><i class="fas fa-cog"></i> 全局配置</div> <div class="badge"><i class="fas fa-layer-group"></i> 配置合并</div> <div class="badge"><i class="fas fa-broadcast-tower"></i> 单播/多播</div> <div class="badge"><i class="fas fa-history"></i> 历史回放</div> <div class="badge"><i class="fas fa-check-circle"></i> ACK确认</div> <div class="badge"><i class="fas fa-clock"></i> 超时处理</div> </div> </header> <div class="config-section"> <div class="card-header"> <h2><i class="fas fa-sliders-h"></i> 全局配置控制台</h2> </div> <div class="config-grid"> <div class="config-box"> <div class="form-group"> <label for="globalMaxHistory"><i class="fas fa-database"></i> 最大历史消息数</label> <input type="number" id="globalMaxHistory" min="1" max="50" value="5"> </div> <div class="form-group"> <label for="globalTimeout"><i class="fas fa-stopwatch"></i> 消息超时时间(ms)</label> <input type="number" id="globalTimeout" min="0" value="15000"> </div> </div> <div class="config-box"> <div class="form-group"> <label for="globalBroadcastMode"><i class="fas fa-bullhorn"></i> 默认广播模式</label> <select id="globalBroadcastMode"> <option value="multicast">多播 (不限消费次数)</option> <option value="single">单播 (仅第一个订阅者消费)</option> <option value="limited">限制消费次数</option> </select> </div> <div class="form-group"> <label for="globalMaxConsumption"><i class="fas fa-user-friends"></i> 默认最大消费次数</label> <input type="number" id="globalMaxConsumption" min="1" value="3"> </div> </div> <div class="config-box"> <div class="form-group"> <label for="globalHistoryEnabled"><i class="fas fa-history"></i> <input type="checkbox" id="globalHistoryEnabled" checked> 历史回放默认启用 </label> </div> <div class="form-group"> <label for="globalPublisherName"><i class="fas fa-user-tag"></i> 发布者默认名称</label> <input type="text" id="globalPublisherName" value="系统发布者"> </div> <button id="updateGlobalConfig" class="info"><i class="fas fa-sync-alt"></i> 更新全局配置</button> </div> </div> </div> <div class="dashboard"> <div class="card"> <div class="card-header"> <h2><i class="fas fa-paper-plane"></i> 消息发布控制台</h2> </div> <div class="form-group"> <label for="topic"><i class="fas fa-hashtag"></i> 主题名称</label> <input type="text" id="topic" placeholder="例如:news.sports" value="news.sports"> </div> <div class="form-group"> <label for="message"><i class="fas fa-envelope"></i> 消息内容</label> <textarea id="message" placeholder="输入要发布的消息内容...">NBA总决赛正在火热进行中,敬请关注!</textarea> </div> <div class="form-group"> <label for="broadcastMode"><i class="fas fa-bullhorn"></i> 广播模式</label> <select id="broadcastMode"> <option value="">使用全局默认</option> <option value="multicast">多播 (不限消费次数)</option> <option value="single">单播 (仅第一个订阅者消费)</option> <option value="limited">限制消费次数</option> </select> </div> <div id="maxConsumptionContainer" style="display: none;"> <label for="maxConsumption"><i class="fas fa-user-friends"></i> 最大消费次数</label> <input type="number" id="maxConsumption" min="1" value="3"> </div> <div class="form-group"> <label for="historyConfig"><i class="fas fa-history"></i> 历史记录配置 (覆盖全局)</label> <div class="actions"> <input type="number" id="historySize" min="0" max="50" placeholder="历史记录大小"> <input type="number" id="messageTimeout" min="0" placeholder="消息超时时间(ms)"> </div> </div> <div class="message-visual"> <div class="message-flow"></div> <div class="message-ball ball1">MSG</div> <div class="message-ball ball2">ACK</div> <div class="message-ball ball3">HIST</div> </div> <div class="actions"> <button id="publishBtn"><i class="fas fa-paper-plane"></i> 发布消息</button> <button id="publishNoSubBtn" class="warning"><i class="fas fa-user-slash"></i> 发布无订阅者消息</button> </div> <h3 style="margin-top: 25px; display: flex; align-items: center; gap: 10px;"> <i class="fas fa-file-alt"></i> 发布日志 </h3> <div class="logs" id="publishLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 发布系统准备就绪,请配置全局设置并发布消息 </div> </div> </div> <div class="card"> <div class="card-header"> <h2><i class="fas fa-inbox"></i> 消息订阅控制台</h2> </div> <div class="form-group"> <label for="subTopic"><i class="fas fa-hashtag"></i> 订阅主题</label> <input type="text" id="subTopic" placeholder="例如:news.sports" value="news.sports"> </div> <div class="form-group"> <label for="subscriberId"><i class="fas fa-id-card"></i> 订阅者ID</label> <input type="text" id="subscriberId" placeholder="输入自定义订阅者ID (可选)"> </div> <div class="form-group"> <label><i class="fas fa-history"></i> <input type="checkbox" id="replayHistory" checked> 接收历史消息 </label> </div> <button id="subscribeBtn" class="secondary"><i class="fas fa-plus-circle"></i> 创建订阅者</button> <div class="topic-info"> <div class="topic-card"> <h3><i class="fas fa-basketball-ball"></i> news.sports</h3> <div class="metrics"> <div class="metric-box"> <div class="metric-value sub-count" id="sportsSubCount">0</div> <div class="metric-label">活跃订阅者</div> </div> <div class="metric-box"> <div class="metric-value pending-count" id="sportsPendingCount">0</div> <div class="metric-label">等待消息</div> </div> </div> <button class="info" id="viewSportsDetails"><i class="fas fa-chart-bar"></i> 查看详情</button> </div> <div class="topic-card"> <h3><i class="fas fa-cloud-sun"></i> weather.update</h3> <div class="metrics"> <div class="metric-box"> <div class="metric-value sub-count" id="weatherSubCount">0</div> <div class="metric-label">活跃订阅者</div> </div> <div class="metric-box"> <div class="metric-value pending-count" id="weatherPendingCount">0</div> <div class="metric-label">等待消息</div> </div> </div> <button class="info" id="viewWeatherDetails"><i class="fas fa-chart-bar"></i> 查看详情</button> </div> </div> <h3 style="margin-top: 25px; display: flex; align-items: center; gap: 10px;"> <i class="fas fa-file-alt"></i> 订阅日志 </h3> <div class="logs" id="subscribeLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 订阅系统准备就绪,请创建订阅者 </div> </div> </div> </div> <div class="card"> <div class="card-header"> <h2><i class="fas fa-tasks"></i> 配置合并演示</h2> </div> <div class="demo-buttons"> <div class="demo-button" id="demoGlobalConfig"> <i class="fas fa-cogs"></i> <p>全局配置优先级</p> <small>无发布配置时使用全局配置</small> </div> <div class="demo-button" id="demoConfigMerge"> <i class="fas fa-layer-group"></i> <p>配置合并示例</p> <small>全局+发布配置=最终配置</small> </div> <div class="demo-button" id="demoLocalOverride"> <i class="fas fa-code-merge"></i> <p>局部覆盖示例</p> <small>发布配置覆盖全局配置</small> </div> <div class="demo-button" id="demoMessageLifetime"> <i class="fas fa-hourglass-half"></i> <p>消息生命周期</p> <small>从发布到消费全过程</small> </div> </div> </div> <div class="system-log"> <div class="system-log-header"> <i class="fas fa-terminal"></i> <h2>系统日志</h2> </div> <div class="logs" id="systemLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> PubSub系统初始化完成,使用默认全局配置 </div> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 全局配置: maxHistory=5, messageTimeout=15000ms, broadcastMode=multicast </div> </div> </div> <footer> <p>高级PubSub消息系统 | 支持全局配置和局部覆盖的强大消息中间件</p> <p>© 2023 消息系统实验室 | 设计:前端架构师团队</p> </footer> </div> <script src="pubsub.js"></script> <script src="app.js"></script> </body> </html>

style.css

css
* { box-sizing: border-box; margin: 0; padding: 0; } :root { --primary-color: #3498db; --primary-light: #5dade2; --primary-dark: #2980b9; --secondary-color: #2ecc71; --secondary-light: #58d68d; --warning-color: #e74c3c; --warning-light: #ec7063; --dark-color: #2c3e50; --light-color: #ecf0f1; --gray-light: #bdc3c7; --background-gradient: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%); --card-bg: rgba(255, 255, 255, 0.97); --shadow: 0 10px 30px rgba(0, 0, 0, 0.15); } body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; line-height: 1.6; color: #333; background: var(--background-gradient); padding: 20px; min-height: 100vh; position: relative; } body::before { content: ''; position: absolute; top: 0; left: 0; right: 0; bottom: 0; background: radial-gradient(circle at center, transparent, rgba(0, 0, 0, 0.4)); z-index: -1; } .container { max-width: 1400px; margin: 0 auto; position: relative; } header { text-align: center; padding: 30px 0; margin-bottom: 30px; color: white; background: rgba(255, 255, 255, 0.1); border-radius: 15px; backdrop-filter: blur(10px); box-shadow: var(--shadow); position: relative; overflow: hidden; } header::after { content: ''; position: absolute; top: 0; left: 0; right: 0; height: 5px; background: linear-gradient(90deg, var(--primary-color), var(--secondary-color)); } h1 { font-size: 3.2rem; margin-bottom: 15px; background: linear-gradient(to right, var(--primary-light), var(--secondary-light)); -webkit-background-clip: text; -webkit-text-fill-color: transparent; text-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); position: relative; padding-bottom: 15px; } h1::after { content: ''; position: absolute; bottom: 0; left: 35%; width: 30%; height: 3px; background: linear-gradient(90deg, transparent, var(--primary-light), transparent); } .subtitle { color: var(--light-color); font-size: 1.2rem; max-width: 900px; margin: 0 auto; font-weight: 300; line-height: 1.8; } .feature-badges { display: flex; justify-content: center; flex-wrap: wrap; gap: 15px; margin-top: 25px; } .badge { background: rgba(255, 255, 255, 0.15); backdrop-filter: blur(5px); padding: 10px 20px; border-radius: 30px; font-size: 0.95rem; font-weight: 500; display: flex; align-items: center; gap: 8px; transition: transform 0.3s, background 0.3s; } .badge:hover { transform: translateY(-3px); background: rgba(255, 255, 255, 0.25); } .dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 25px; margin-bottom: 30px; } @media (max-width: 1000px) { .dashboard { grid-template-columns: 1fr; } } .card { background: var(--card-bg); border-radius: 15px; padding: 30px; box-shadow: var(--shadow); transition: all 0.4s ease; position: relative; } .card:hover { transform: translateY(-8px); box-shadow: 0 20px 40px rgba(0, 0, 0, 0.25); } .card-header { display: flex; align-items: center; justify-content: space-between; margin-bottom: 25px; padding-bottom: 20px; border-bottom: 2px solid var(--primary-color); } .card-header h2 { display: flex; align-items: center; gap: 15px; color: var(--dark-color); font-size: 1.8rem; } .card-header i { font-size: 2rem; color: var(--primary-color); background: rgba(52, 152, 219, 0.1); width: 60px; height: 60px; display: flex; align-items: center; justify-content: center; border-radius: 50%; } .form-group { margin-bottom: 22px; } .form-group label { display: block; margin-bottom: 12px; font-weight: 600; color: var(--dark-color); font-size: 1.1rem; display: flex; align-items: center; gap: 10px; } input, select, button, textarea { width: 100%; padding: 15px 20px; border: 1px solid var(--gray-light); border-radius: 10px; font-size: 16px; background: #f9fafb; transition: all 0.3s; } input:focus, select:focus, textarea:focus { border-color: var(--primary-color); box-shadow: 0 0 0 4px rgba(52, 152, 219, 0.25); outline: none; background: white; } textarea { min-height: 120px; resize: vertical; } button { background: var(--primary-color); color: white; border: none; cursor: pointer; font-weight: 600; letter-spacing: 0.5px; transition: all 0.3s; font-size: 1.1rem; display: flex; align-items: center; justify-content: center; gap: 12px; padding: 18px; border-radius: 12px; } button:hover { background: var(--primary-dark); transform: translateY(-3px); box-shadow: 0 7px 15px rgba(52, 152, 219, 0.3); } button.secondary { background: var(--secondary-color); } button.secondary:hover { background: #27ae60; box-shadow: 0 7px 15px rgba(46, 204, 113, 0.3); } button.warning { background: var(--warning-color); } button.warning:hover { background: #c0392b; box-shadow: 0 7px 15px rgba(231, 76, 60, 0.3); } button.info { background: #9b59b6; } button.info:hover { background: #8e44ad; box-shadow: 0 7px 15px rgba(155, 89, 182, 0.3); } .actions { display: grid; grid-template-columns: 1fr 1fr; gap: 18px; } .logs { background: #f8fafc; border: 1px solid #e1e4e8; border-radius: 12px; padding: 20px; margin-top: 20px; height: 300px; overflow-y: auto; font-family: 'Fira Code', 'Menlo', monospace; font-size: 0.95rem; box-shadow: inset 0 2px 10px rgba(0, 0, 0, 0.05); } .log-entry { padding: 14px 18px; border-radius: 10px; margin-bottom: 12px; display: flex; animation: fadeIn 0.4s; position: relative; font-size: 0.9rem; } .log-entry:last-child { margin-bottom: 0; } .log-time { color: var(--dark-color); margin-right: 15px; font-size: 0.9rem; min-width: 90px; font-weight: 600; flex-shrink: 0; } .log-entry.realtime { background: rgba(52, 152, 219, 0.08); border-left: 4px solid var(--primary-color); } .log-entry.history { background: rgba(46, 204, 113, 0.08); border-left: 4px solid var(--secondary-color); } .log-entry.warning { background: rgba(231, 76, 60, 0.08); border-left: 4px solid var(--warning-color); } .log-entry.ack { background: rgba(155, 89, 182, 0.08); border-left: 4px solid #9b59b6; } .log-entry.system { background: rgba(41, 128, 185, 0.08); border-left: 4px solid #2980b9; } .topic-info { display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; margin-top: 25px; } .topic-card { background: linear-gradient(135deg, rgba(255, 255, 255, 0.95), rgba(245, 247, 250, 0.95)); border-radius: 12px; padding: 22px; text-align: center; box-shadow: 0 6px 18px rgba(0, 0, 0, 0.08); transition: transform 0.4s; position: relative; overflow: hidden; } .topic-card::before { content: ''; position: absolute; top: 0; left: 0; right: 0; height: 5px; background: linear-gradient(90deg, var(--primary-color), var(--secondary-color)); } .topic-card:hover { transform: scale(1.03) rotate(1deg); } .topic-card h3 { color: var(--dark-color); margin-bottom: 20px; font-size: 1.4rem; display: flex; align-items: center; justify-content: center; gap: 12px; } .metrics { display: grid; grid-template-columns: 1fr 1fr; gap: 18px; margin: 20px 0; } .metric-box { padding: 15px; border-radius: 10px; background: rgba(255, 255, 255, 0.8); box-shadow: 0 4px 10px rgba(0, 0, 0, 0.05); } .metric-value { font-size: 1.8rem; font-weight: bold; margin: 8px 0; } .sub-count { color: var(--primary-dark); } .pending-count { color: var(--secondary-color); } .metric-label { font-size: 0.92rem; color: var(--dark-color); font-weight: 600; } .system-log { margin-top: 40px; padding: 30px; background: var(--card-bg); border-radius: 15px; box-shadow: var(--shadow); } .system-log-header { display: flex; align-items: center; gap: 15px; margin-bottom: 25px; padding-bottom: 15px; border-bottom: 2px solid var(--primary-color); } .system-log-header i { font-size: 2.2rem; color: var(--primary-color); } .system-log-header h2 { font-size: 1.8rem; color: var(--dark-color); } footer { text-align: center; padding: 30px 0; margin-top: 50px; color: rgba(255, 255, 255, 0.85); font-size: 0.95rem; border-top: 1px solid rgba(255, 255, 255, 0.15); } footer p { margin-bottom: 10px; } @keyframes fadeIn { from { opacity: 0; transform: translateY(15px); } to { opacity: 1; transform: translateY(0); } } .message-visual { height: 160px; background: rgba(255, 255, 255, 0.1); border-radius: 15px; margin: 25px 0; position: relative; overflow: hidden; box-shadow: inset 0 0 20px rgba(0, 0, 0, 0.1); } .message-flow { position: absolute; width: 100%; height: 40px; top: 60px; background: url('data:image/svg+xml;utf8,<svg xmlns="http://www.w3.org/2000/svg" width="200" height="40" viewBox="0 0 200 40"><path d="M10,20 Q100,0 190,20" fill="none" stroke="rgba(255,255,255,0.3)" stroke-width="3" stroke-dasharray="5,4"/></svg>'); background-repeat: repeat-x; animation: flowMove 20s linear infinite; } @keyframes flowMove { 0% { background-position: 0 0; } 100% { background-position: 200px 0; } } .message-ball { position: absolute; width: 45px; height: 45px; border-radius: 50%; display: flex; align-items: center; justify-content: center; color: white; font-weight: bold; font-size: 0.9rem; top: 47px; animation: moveBall 5s ease-in-out infinite; box-shadow: 0 8px 20px rgba(0, 0, 0, 0.2); z-index: 10; } .ball1 { left: 0; background: linear-gradient(135deg, var(--primary-color), var(--primary-dark)); animation-delay: 0s; } .ball2 { left: 70px; background: linear-gradient(135deg, var(--secondary-color), #27ae60); animation-delay: 1s; } .ball3 { left: 140px; background: linear-gradient(135deg, var(--warning-light), var(--warning-color)); animation-delay: 2s; } @keyframes moveBall { 0% { transform: translateX(0) translateY(0); } 25% { transform: translateX(250px) translateY(-20px); } 50% { transform: translateX(500px) translateY(0); } 75% { transform: translateX(750px) translateY(20px); } 100% { transform: translateX(1000px) translateY(0); } } .demo-buttons { display: grid; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 22px; margin-top: 35px; } .demo-button { padding: 25px 20px; text-align: center; background: linear-gradient(135deg, #1e3c72, #2a5298); color: white; border-radius: 12px; font-weight: 500; cursor: pointer; transition: all 0.4s; box-shadow: 0 8px 20px rgba(0, 0, 0, 0.2); position: relative; overflow: hidden; } .demo-button::before { content: ''; position: absolute; top: -50%; left: -50%; width: 200%; height: 200%; background: linear-gradient(rgba(255, 255, 255, 0), rgba(255, 255, 255, 0.3), rgba(255, 255, 255, 0)); transform: rotate(45deg); } .demo-button:hover { transform: translateY(-8px); box-shadow: 0 15px 30px rgba(0, 0, 0, 0.3); } .demo-button i { font-size: 2.4rem; display: block; margin-bottom: 12px; } .demo-button p { font-size: 1.3rem; margin-bottom: 8px; } .demo-button small { font-size: 0.9rem; opacity: 0.8; } .config-section { background: var(--card-bg); border-radius: 15px; padding: 30px; margin-bottom: 30px; box-shadow: var(--shadow); } .config-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 20px; } .config-box { padding: 20px; background: rgba(52, 152, 219, 0.08); border-radius: 10px; } #maxConsumptionContainer { margin: 15px 0; }

app.js

js
// 创建PubSub实例 const pubsub = new PubSub(); // 日志记录函数 function logPublish (message, type = 'realtime') { const now = new Date(); const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`; const logEntry = document.createElement('div'); logEntry.className = `log-entry ${type}`; logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`; const publishLog = document.getElementById('publishLog'); publishLog.appendChild(logEntry); publishLog.scrollTop = publishLog.scrollHeight; } function logSubscribe (message, type = 'history') { const now = new Date(); const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`; const logEntry = document.createElement('div'); logEntry.className = `log-entry ${type}`; logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`; const subscribeLog = document.getElementById('subscribeLog'); subscribeLog.appendChild(logEntry); subscribeLog.scrollTop = subscribeLog.scrollHeight; } function logSystem (message) { const now = new Date(); const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`; const logEntry = document.createElement('div'); logEntry.className = 'log-entry system'; logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`; const systemLog = document.getElementById('systemLog'); systemLog.appendChild(logEntry); systemLog.scrollTop = systemLog.scrollHeight; } // 更新主题信息 function updateTopicInfo () { const sportsInfo = pubsub.getTopicInfo('news.sports') || { subscribers: 0, pendingMessages: 0 }; const weatherInfo = pubsub.getTopicInfo('weather.update') || { subscribers: 0, pendingMessages: 0 }; document.getElementById('sportsSubCount').textContent = sportsInfo.subscribers; document.getElementById('sportsPendingCount').textContent = sportsInfo.pendingMessages; document.getElementById('weatherSubCount').textContent = weatherInfo.subscribers; document.getElementById('weatherPendingCount').textContent = weatherInfo.pendingMessages; } // DOM元素引用 const broadcastMode = document.getElementById('broadcastMode'); const maxConsumptionContainer = document.getElementById('maxConsumptionContainer'); const updateGlobalConfigBtn = document.getElementById('updateGlobalConfig'); // 广播模式切换 broadcastMode.addEventListener('change', () => { maxConsumptionContainer.style.display = broadcastMode.value === 'limited' ? 'block' : 'none'; }); // 更新全局配置 updateGlobalConfigBtn.addEventListener('click', () => { const newConfig = { maxHistory: parseInt(document.getElementById('globalMaxHistory').value) || 5, messageTimeout: parseInt(document.getElementById('globalTimeout').value) || 15000, broadcastMode: document.getElementById('globalBroadcastMode').value, maxConsumption: document.getElementById('globalBroadcastMode').value === 'limited' ? parseInt(document.getElementById('globalMaxConsumption').value) || 3 : null, historyEnabled: document.getElementById('globalHistoryEnabled').checked, publisherName: document.getElementById('globalPublisherName').value || '系统发布者' }; pubsub.updateGlobalConfig(newConfig); logSystem(`全局配置已更新: maxHistory=${newConfig.maxHistory}, timeout=${newConfig.messageTimeout}ms, broadcastMode=${newConfig.broadcastMode}`); }); // 消息发布 document.getElementById('publishBtn').addEventListener('click', () => { const topic = document.getElementById('topic').value; const message = document.getElementById('message').value; // 获取本地配置 const localConfig = { broadcastMode: broadcastMode.value === '' ? pubsub.globalConfig.broadcastMode : broadcastMode.value }; // 如果有覆盖配置,优先使用 if (document.getElementById('historySize').value) { localConfig.maxHistory = parseInt(document.getElementById('historySize').value); } if (document.getElementById('messageTimeout').value) { localConfig.messageTimeout = parseInt(document.getElementById('messageTimeout').value); } // 发布消息 const messageId = pubsub.publish(topic, message, localConfig, (ackInfo) => { const type = ackInfo.status === 'timeout' ? 'warning' : 'ack'; logPublish(`${type === 'warning' ? '⏰ 消息超时' : '✅ 已消费'}: ${ackInfo.message.substring(0, 30)}... (订阅者: ${ackInfo.subscriberId || '无'})`, type); }); logPublish(`发布消息成功: ${message.substring(0, 30)}... (ID: ${messageId})`); updateTopicInfo(); }); // 无订阅者消息发布 document.getElementById('publishNoSubBtn').addEventListener('click', () => { const topic = 'weather.update'; const message = '今日天气: 晴天, 气温25℃'; document.getElementById('topic').value = topic; document.getElementById('message').value = message; const messageId = pubsub.publish(topic, message); logPublish(`发布消息到无订阅者主题: ${message.substring(0, 30)}... (ID: ${messageId})`, 'warning'); updateTopicInfo(); }); // 创建订阅者 document.getElementById('subscribeBtn').addEventListener('click', () => { const topic = document.getElementById('subTopic').value; const subscriberId = document.getElementById('subscriberId').value || undefined; const replayHistory = document.getElementById('replayHistory').checked; const subscriptionId = pubsub.subscribe( topic, (message, ack, meta) => { const type = meta.isHistory ? 'history' : 'realtime'; logSubscribe( `${meta.isHistory ? '🕰️ 历史消息' : '⚡ 实时消息'}: ${message.substring(0, 30)}... (来自: ${meta.publisher}, ID: ${meta.id})`, type ); ack('成功处理'); }, { subscriberId, replayHistory } ); logSubscribe(`订阅者创建成功: ${subscriberId || subscriptionId} (主题: ${topic})`, 'system'); updateTopicInfo(); }); // 演示功能 document.getElementById('demoGlobalConfig').addEventListener('click', () => { // 设置全局配置 pubsub.updateGlobalConfig({ maxHistory: 3, messageTimeout: 10000 }); // 发布消息(使用全局配置) pubsub.publish('demo.topic', '使用全局配置的消息', {}); logSystem('演示: 已设置全局配置 maxHistory=3, timeout=10000ms'); logSystem('发布消息: 使用全局配置的消息'); }); document.getElementById('demoConfigMerge').addEventListener('click', () => { // 发布消息(使用全局和局部配置) pubsub.publish( 'demo.topic', '合并配置的消息', { maxHistory: 2, publisherName: '特殊发布者' } ); logSystem('演示: 全局配置 + 发布配置 = 最终配置'); logSystem('发布消息: 合并配置的消息 (覆盖maxHistory=2)'); }); document.getElementById('demoLocalOverride').addEventListener('click', () => { // 覆盖全局配置 pubsub.updateGlobalConfig({ broadcastMode: 'single', maxHistory: 5 }); // 发布消息(使用局部配置覆盖) pubsub.publish( 'demo.topic', '局部覆盖配置的消息', { broadcastMode: 'multicast', publisherName: '测试发布者' } ); logSystem('演示: 局部配置覆盖全局配置'); logSystem('发布消息: 局部覆盖配置的消息 (覆盖broadcastMode=multicast)'); }); document.getElementById('demoMessageLifetime').addEventListener('click', () => { // 更新全局配置 pubsub.updateGlobalConfig({ maxHistory: 3, messageTimeout: 15000, broadcastMode: 'multicast' }); // 发布几条消息 pubsub.publish('demo.lifecycle', '生命周期消息1'); pubsub.publish('demo.lifecycle', '生命周期消息2'); pubsub.publish('demo.lifecycle', '生命周期消息3'); // 创建订阅者 pubsub.subscribe('demo.lifecycle', (msg, ack) => { logSubscribe(`消费消息: ${msg.substring(0, 20)}...`, 'realtime'); ack('ACK确认'); }); logSystem('演示: 消息生命周期演示 (发布3条消息)'); logSystem('创建订阅者消费历史消息'); }); // 页面加载时初始化 document.addEventListener('DOMContentLoaded', () => { logSystem('系统启动: PubSub消息系统已就绪'); logSystem('默认全局配置: maxHistory=5, messageTimeout=15000, broadcastMode=multicast'); updateTopicInfo(); // 设置初始全局配置值 document.getElementById('globalMaxHistory').value = 5; document.getElementById('globalTimeout').value = 15000; document.getElementById('globalPublisherName').value = '系统发布者'; });

ts升级版(ack支持异步)

ts
import { AckParamProp, DeepPartial, GlobalOPtionProp, MessageMetaProp, PublishOptionProp, SubscribeOptionProp, TopicMapSubscriberMapValue, TopicMapValue } from "./type"; const TIMEOUT_STATUS = "timeout" const CONSUMED_STATUS = "consumed" export class PubSub { messageCounter: number = 0 private globalConfig: GlobalOPtionProp /** 存储主题 */ private topics: Map<string, TopicMapValue> private ackCallbacks: Map<string, (params: AckParamProp) => void> private subscribers: Map<string, any> constructor(options: Partial<GlobalOPtionProp> = {}) { // 默认全局配置 const defaultConfig: GlobalOPtionProp = { maxHistory: 5, messageTimeout: 15000, broadcastMode: 'multicast', maxConsumption: null, historyEnabled: true, publisherName: '系统发布者' }; // 合并用户传入的全局配置 this.globalConfig = { ...defaultConfig, ...options }; this.topics = new Map(); this.ackCallbacks = new Map(); this.subscribers = new Map(); this.messageCounter = 0; } /** * 更新全局配置 * @param {Object} newConfig 新的全局配置 */ updateGlobalConfig(newConfig: Partial<GlobalOPtionProp>) { this.globalConfig = { ...this.globalConfig, ...newConfig }; } /** * 订阅主题 * @param {string} topic 主题名称 * @param {Function} callback 回调函数 * @param {Object} options 选项 */ subscribe(topic, callback, options: Partial<SubscribeOptionProp> = {}) { debugger if (!this.topics.has(topic)) { this.topics.set(topic, { subscribers: new Map(), history: [], config: { ...this.globalConfig } }); } const topicInfo = this.topics.get(topic); const subscriberId = options.subscriberId || `sub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; // 保存订阅者 topicInfo.subscribers.set(subscriberId, { callback, options: { replayHistory: this.globalConfig.historyEnabled, ...options } }); // 存储全局订阅者引用 this.subscribers.set(subscriberId, { topic, callback }); // 回放历史消息 if (topicInfo.subscribers.size === 1 && topicInfo.history.length > 0) { this._replayHistory(topic, subscriberId); } return subscriberId; } /** * 发布消息 * @param {string} topic 主题名称 * @param {*} message 消息内容 * @param {Object} localOptions 局部选项(可选) */ publish(topic: string, message: unknown, localOptions: Partial<PublishOptionProp> = {}) { debugger const { ackCallback } = localOptions if (!this.topics.has(topic)) { this.topics.set(topic, { subscribers: new Map(), history: [], config: { ...this.globalConfig } }); } const topicInfo = this.topics.get(topic) const messageId = `msg-${this.messageCounter++}`; const timestamp = Date.now(); // 合并配置:全局配置 + 发布时局部配置 const mergedOptions = { ...this.globalConfig, ...localOptions, publisherName: localOptions.publisherName || this.globalConfig.publisherName }; const messageData: MessageMetaProp = { id: messageId, topic, message, timestamp, publisher: mergedOptions.publisherName, options: mergedOptions, consumedCount: 0, consumers: [], consumed: false,// 是否已消费 }; // 存储ACK回调 if (typeof ackCallback === 'function') { this.ackCallbacks.set(messageId, ackCallback); } // 如果有订阅者,立即投递 if (topicInfo.subscribers.size > 0) { this._deliverMessage(topic, messageData); } else { // 存储为历史消息 this._addToHistory(topicInfo, messageData); } return messageId; } /** * 取消订阅 * @param {string} topic 主题名称 * @param {string} subscriberId 订阅者ID */ unsubscribe(topic, subscriberId) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; if (topicInfo.subscribers.has(subscriberId)) { topicInfo.subscribers.delete(subscriberId); this.subscribers.delete(subscriberId); } } /** * 清除主题历史记录 * @param {string} topic 主题名称 */ clearHistory(topic) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; topicInfo.history.forEach(msg => { if (msg.timeoutId) clearTimeout(msg.timeoutId); }); topicInfo.history = []; } /** * 获取主题信息 * @param {string} topic 主题名称 */ getTopicInfo(topic) { if (!this.topics.has(topic)) return null; const topicInfo = this.topics.get(topic); return { subscribers: topicInfo.subscribers.size, pendingMessages: topicInfo.history.length }; } /** * 重置系统 */ reset() { this.topics.clear(); this.ackCallbacks.clear(); this.subscribers.clear(); this.messageCounter = 0; } /** * 添加到历史记录 * @param {Object} topicInfo 主题信息 * @param {Object} messageData 消息数据 */ private _addToHistory(topicInfo: TopicMapValue, messageData: MessageMetaProp) { const { history } = topicInfo; const maxHistory = messageData.options.maxHistory || 5; if (messageData.options.messageTimeout > 0) { messageData.timeoutId = window.setTimeout(() => { if (this.ackCallbacks.has(messageData.id)) { const ackInfo: AckParamProp = { status: TIMEOUT_STATUS, messageId: messageData.id, message: messageData.message, topic: messageData.topic, publishTime: messageData.timestamp, timeoutTime: Date.now() }; this.ackCallbacks.get(messageData.id)(ackInfo); const index = history.findIndex(msg => msg.id === messageData.id); if (index !== -1) { history.splice(index, 1); } this.ackCallbacks.delete(messageData.id); } }, messageData.options.messageTimeout); } if (history.length >= maxHistory) { const removed = history.shift(); //if (removed?.timeoutId) clearTimeout(removed.timeoutId); if(removed.timeoutId){ clearTimeout(removed.timeoutId) } } history.push(messageData); } /** * 投递消息给订阅者 * @param {string} topic 主题名称 * @param {MessageMetaProp} messageData 消息数据 */ private _deliverMessage(topic: string, messageData: MessageMetaProp) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; const { subscribers } = topicInfo; const { broadcastMode, maxConsumption } = messageData.options; // 单播模式 一条消息只能被一个订阅者消费 if (broadcastMode === 'single') { const firstSubscriber = subscribers.entries().next().value; if (firstSubscriber) { const [subscriberId, subscriber] = firstSubscriber; this._deliverToSubscriber(subscriberId, subscriber, messageData); } return; } // 剩余消费次数 const remainingConsumption = maxConsumption === null ? Infinity : maxConsumption - messageData.consumedCount; let count = 0; for (const [subscriberId, subscriber] of subscribers) { if (count >= remainingConsumption) break; if (messageData.consumers.includes(subscriberId)) continue; this._deliverToSubscriber(subscriberId, subscriber, messageData); messageData.consumers.push(subscriberId); count++; messageData.consumedCount++; if (messageData.consumedCount >= remainingConsumption) break; } } /** * 投递给指定订阅者 * @param {string} subscriberId 订阅者ID * @param {Object} subscriber 订阅者 * @param {Object} messageData 消息数据 */ private _deliverToSubscriber(subscriberId: string, subscriber: DeepPartial<TopicMapSubscriberMapValue>, messageData: MessageMetaProp) { const ack = (result) => { const ackInfo = { subscriberId, status: CONSUMED_STATUS, messageId: messageData.id, topic: messageData.topic, message: messageData.message, publishTime: messageData.timestamp, consumeTime: Date.now(), result }; if (this.ackCallbacks.has(messageData.id)) { this.ackCallbacks.get(messageData.id)(ackInfo); } }; try { //subscriber.callback(messageData.message, ack, messageData); const p = subscriber.callback(messageData.message, messageData) Promise.resolve(p).then((res:any) => { ack(res) }).catch(e => { ack({ error: e}); }) } catch (error) { ack({ error: error.message }); } } /** * 回放历史消息给订阅者 * @param {string} topic 主题名称 * @param {string} subscriberId 订阅者ID */ private _replayHistory(topic: string, subscriberId: string) { const topicInfo = this.topics.get(topic); if (!topicInfo) return; const subscriber = topicInfo.subscribers.get(subscriberId); if (!subscriber) return; topicInfo.history.forEach(messageData => { const ack = (result = {}) => { const ackInfo = { subscriberId, status: CONSUMED_STATUS, messageId: messageData.id, message: messageData.message, topic, publishTime: messageData.timestamp, consumeTime: Date.now(), result, isHistory: true }; if (this.ackCallbacks.has(messageData.id)) { this.ackCallbacks.get(messageData.id)(ackInfo); this.ackCallbacks.delete(messageData.id); } }; try { const p = subscriber.callback(messageData.message, { ...messageData, isHistory: true }) Promise.resolve(p).then((res:any) => { ack(res) }).catch(e => { ack({ error: e}); }) } catch (error) { ack({ error: error.message }); } }); } }

type.ts

ts
export type DeepPartial<T> = T extends Function ? T | undefined : T extends Date ? T | undefined : T extends (infer U)[] ? U[] | undefined : T extends object ? { [K in keyof T]?: DeepPartial<T[K]> } : T | undefined; export type EventType = string | symbol; export interface GlobalOPtionProp { maxHistory: number, /** 消息超时时间,生命有效期,单位毫秒 */ messageTimeout: number, /** 单播或多播 */ broadcastMode: "multicast" | "single", /** 被多少个订阅者消费 */ maxConsumption: null | number, /** 是否开启历史回放 */ historyEnabled: boolean /** 消息发布者名称 */ publisherName: string } export interface PublishOptionProp extends GlobalOPtionProp { ackCallback: (params:AckParamProp) => void } export interface SubscribeOptionProp { /** 订阅者id */ subscriberId: string } export type SubscribeCallback = (message:unknown,meta: MessageMetaProp) => Promise<void> export interface TopicMapSubscriberMapValue { callback: SubscribeCallback, options: { replayHistory: GlobalOPtionProp["historyEnabled"], [K: string]: any, } } export interface TopicMapValue { subscribers: Map<string, DeepPartial<TopicMapSubscriberMapValue>>, history: Array<MessageMetaProp>, config: GlobalOPtionProp, } // 消息元数据,除了publish传递的消息还包括额外附加消息 export interface MessageMetaProp { id: string, topic: string, message: unknown, /** 发布时间戳 */ timestamp: number, publisher: string, options: DeepPartial<PublishOptionProp & GlobalOPtionProp>, /** 消息的消费次数 */ consumedCount: number, consumers: Array<string>, consumed: boolean,// 是否已消费 timeoutId?:number, isHistory?: boolean, [T:string]:any } export interface AckParamProp{ status: string, messageId: string, message: unknown, topic: string, publishTime: number, timeoutTime?: number, consumeTime?: number, subscriberId?: string, result?: unknown }

测试

ts
function PubSubTest() { const ps = new PubSub({ messageTimeout: 150000 }) // 先发布再订阅(已发布的历史消息只能被一个消费者消费,谁先订阅谁先消费) const f1 = () => { ps.publish("topic-1", "消息-1", { ackCallback: (data) => { console.log("topic-1 消息-1 ackCallback:", data,ps) } }) // 3秒后有人订阅 setTimeout(() => { console.log("订阅者1,2订阅") ps.subscribe("topic-1", (msg, meta) => { return new Promise((resolve) => { setTimeout(() => { resolve("我是订阅者1") }, 2000); }) },{ subscriberId: "sid-1" }) ps.subscribe("topic-1", (msg, meta) => { return new Promise((resolve) => { setTimeout(() => { resolve("我是订阅者2") }, 4000); }) },{ subscriberId: "sid-2" }) }, 3000); } // 先订阅再发布 const f2 = () => { console.log("订阅者1订阅 topic-2 ") ps.subscribe("topic-2", (msg, meta) => { return new Promise((resolve) => { setTimeout(() => { resolve("我是订阅者1") }, 2000); }) }) console.log("订阅者2订阅 topic-2 ") ps.subscribe("topic-2", (msg, meta) => { return new Promise((resolve) => { setTimeout(() => { resolve("我是订阅者2") }, 4000); }) }) setTimeout(() => { console.log("topic-2 发布消息:") ps.publish("topic-2", "消息-1", { ackCallback: (data) => { console.log("topic-2 消息-1 ackCallback:", data) //console.log(ps) } }) }, 3000); } f1() //f2() }
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:繁星

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!