实现一个简易单不简单的发布订阅的插件,支持先发布再订阅的模式,支持单播和多播模式,支持历史回放和消息消费ack回掉函数

jsclass PubSub {
constructor(options = {}) {
// 默认全局配置
const defaultConfig = {
maxHistory: 5,
messageTimeout: 15000,
broadcastMode: 'multicast',
maxConsumption: null,
historyEnabled: true,
publisherName: '系统发布者'
};
// 合并用户传入的全局配置
this.globalConfig = { ...defaultConfig, ...options };
this.topics = new Map();
this.ackCallbacks = new Map();
this.subscribers = new Map();
this.messageCounter = 0;
}
/**
* 更新全局配置
* @param {Object} newConfig 新的全局配置
*/
updateGlobalConfig (newConfig) {
this.globalConfig = { ...this.globalConfig, ...newConfig };
}
/**
* 订阅主题
* @param {string} topic 主题名称
* @param {Function} callback 回调函数
* @param {Object} options 选项
*/
subscribe (topic, callback, options = {}) {
if (!this.topics.has(topic)) {
this.topics.set(topic, {
subscribers: new Map(),
history: [],
config: { ...this.globalConfig }
});
}
const topicInfo = this.topics.get(topic);
const subscriberId = options.subscriberId || `sub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// 保存订阅者
topicInfo.subscribers.set(subscriberId, {
callback,
options: {
replayHistory: this.globalConfig.historyEnabled,
...options
}
});
// 存储全局订阅者引用
this.subscribers.set(subscriberId, { topic, callback });
// 回放历史消息
if (topicInfo.subscribers.size === 1 && topicInfo.history.length > 0) {
this._replayHistory(topic, subscriberId);
}
return subscriberId;
}
/**
* 发布消息
* @param {string} topic 主题名称
* @param {*} message 消息内容
* @param {Object} localOptions 局部选项(可选)
*/
publish (topic, message, localOptions = {}, ackCallback) {
if (!this.topics.has(topic)) {
this.topics.set(topic, {
subscribers: new Map(),
history: [],
config: { ...this.globalConfig }
});
}
const topicInfo = this.topics.get(topic)
const messageId = `msg-${this.messageCounter++}`;
const timestamp = Date.now();
// 合并配置:全局配置 + 发布时局部配置
const mergedOptions = {
...this.globalConfig,
...localOptions,
publisherName: localOptions.publisherName || this.globalConfig.publisherName
};
const messageData = {
id: messageId,
topic,
message,
timestamp,
publisher: mergedOptions.publisherName,
options: mergedOptions,
consumedCount: 0,
consumers: [],
consumed: false,// 是否已消费
};
// 保存ACK回调
// const ackCallback = (ackInfo) => {
// if (this.ackCallbacks.has(messageId)) {
// this.ackCallbacks.get(messageId)(ackInfo);
// }
// };
// 存储ACK回调
if (typeof ackCallback === 'function') {
this.ackCallbacks.set(messageId, ackCallback);
}
// 如果有订阅者,立即投递
if (topicInfo.subscribers.size > 0) {
this._deliverMessage(topic, messageData);
} else {
// 存储为历史消息
this._addToHistory(topicInfo, messageData);
}
return messageId;
}
/**
* 投递消息给订阅者
* @param {string} topic 主题名称
* @param {Object} messageData 消息数据
*/
_deliverMessage (topic, messageData) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
const { subscribers } = topicInfo;
const { broadcastMode, maxConsumption } = messageData.options;
if (broadcastMode === 'single') {
const firstSubscriber = subscribers.entries().next().value;
if (firstSubscriber) {
const [subscriberId, subscriber] = firstSubscriber;
this._deliverToSubscriber(subscriberId, subscriber, messageData);
}
return;
}
const remainingConsumption = maxConsumption === null ? Infinity : maxConsumption - messageData.consumedCount;
let count = 0;
for (const [subscriberId, subscriber] of subscribers) {
if (count >= remainingConsumption) break;
if (messageData.consumers.includes(subscriberId)) continue;
this._deliverToSubscriber(subscriberId, subscriber, messageData);
messageData.consumers.push(subscriberId);
count++;
messageData.consumedCount++;
if (messageData.consumedCount >= remainingConsumption) break;
}
}
/**
* 投递给指定订阅者
* @param {string} subscriberId 订阅者ID
* @param {Object} subscriber 订阅者
* @param {Object} messageData 消息数据
*/
_deliverToSubscriber (subscriberId, subscriber, messageData) {
const ack = (result) => {
const ackInfo = {
subscriberId,
messageId: messageData.id,
topic: messageData.topic,
message: messageData.message,
publishTime: messageData.timestamp,
consumeTime: Date.now(),
result
};
if (this.ackCallbacks.has(messageData.id)) {
this.ackCallbacks.get(messageData.id)(ackInfo);
}
};
try {
subscriber.callback(messageData.message, ack, messageData);
} catch (error) {
ack({ error: error.message });
}
}
/**
* 添加到历史记录
* @param {Object} topicInfo 主题信息
* @param {Object} messageData 消息数据
*/
_addToHistory (topicInfo, messageData) {
const { history } = topicInfo;
const maxHistory = messageData.options.maxHistory || 5;
if (messageData.options.messageTimeout > 0) {
messageData.timeoutId = setTimeout(() => {
if (this.ackCallbacks.has(messageData.id)) {
const ackInfo = {
status: 'timeout',
messageId: messageData.id,
message: messageData.message,
topic: messageData.topic,
publishTime: messageData.timestamp,
timeoutTime: Date.now()
};
this.ackCallbacks.get(messageData.id)(ackInfo);
const index = history.findIndex(msg => msg.id === messageData.id);
if (index !== -1) {
history.splice(index, 1);
}
this.ackCallbacks.delete(messageData.id);
}
}, messageData.options.messageTimeout);
}
if (history.length >= maxHistory) {
const removed = history.shift();
if (removed?.timeoutId) clearTimeout(removed.timeoutId);
}
history.push(messageData);
}
/**
* 回放历史消息给订阅者
* @param {string} topic 主题名称
* @param {string} subscriberId 订阅者ID
*/
_replayHistory (topic, subscriberId) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
const subscriber = topicInfo.subscribers.get(subscriberId);
if (!subscriber) return;
topicInfo.history.forEach(messageData => {
const ack = (result) => {
const ackInfo = {
subscriberId,
messageId: messageData.id,
message: messageData.message,
topic,
publishTime: messageData.timestamp,
consumeTime: Date.now(),
result,
isHistory: true
};
if (this.ackCallbacks.has(messageData.id)) {
this.ackCallbacks.get(messageData.id)(ackInfo);
this.ackCallbacks.delete(messageData.id);
}
};
try {
subscriber.callback(messageData.message, ack, {
...messageData,
isHistory: true
});
} catch (error) {
ack({ error: error.message });
}
});
}
/**
* 取消订阅
* @param {string} topic 主题名称
* @param {string} subscriberId 订阅者ID
*/
unsubscribe (topic, subscriberId) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
if (topicInfo.subscribers.has(subscriberId)) {
topicInfo.subscribers.delete(subscriberId);
this.subscribers.delete(subscriberId);
}
}
/**
* 清除主题历史记录
* @param {string} topic 主题名称
*/
clearHistory (topic) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
topicInfo.history.forEach(msg => {
if (msg.timeoutId) clearTimeout(msg.timeoutId);
});
topicInfo.history = [];
}
/**
* 获取主题信息
* @param {string} topic 主题名称
*/
getTopicInfo (topic) {
if (!this.topics.has(topic)) return null;
const topicInfo = this.topics.get(topic);
return {
subscribers: topicInfo.subscribers.size,
pendingMessages: topicInfo.history.length
};
}
/**
* 重置系统
*/
reset () {
this.topics.clear();
this.ackCallbacks.clear();
this.subscribers.clear();
this.messageCounter = 0;
}
}
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>PubSub消息系统</title> <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"> <link rel="stylesheet" href="style.css"> </head> <body> <div class="container"> <header> <h1><i class="fas fa-exchange-alt"></i> 高级PubSub消息系统</h1> <p class="subtitle">支持全局配置、配置合并、消息广播、ACK确认和历史回放的完整消息系统</p> <div class="feature-badges"> <div class="badge"><i class="fas fa-cog"></i> 全局配置</div> <div class="badge"><i class="fas fa-layer-group"></i> 配置合并</div> <div class="badge"><i class="fas fa-broadcast-tower"></i> 单播/多播</div> <div class="badge"><i class="fas fa-history"></i> 历史回放</div> <div class="badge"><i class="fas fa-check-circle"></i> ACK确认</div> <div class="badge"><i class="fas fa-clock"></i> 超时处理</div> </div> </header> <div class="config-section"> <div class="card-header"> <h2><i class="fas fa-sliders-h"></i> 全局配置控制台</h2> </div> <div class="config-grid"> <div class="config-box"> <div class="form-group"> <label for="globalMaxHistory"><i class="fas fa-database"></i> 最大历史消息数</label> <input type="number" id="globalMaxHistory" min="1" max="50" value="5"> </div> <div class="form-group"> <label for="globalTimeout"><i class="fas fa-stopwatch"></i> 消息超时时间(ms)</label> <input type="number" id="globalTimeout" min="0" value="15000"> </div> </div> <div class="config-box"> <div class="form-group"> <label for="globalBroadcastMode"><i class="fas fa-bullhorn"></i> 默认广播模式</label> <select id="globalBroadcastMode"> <option value="multicast">多播 (不限消费次数)</option> <option value="single">单播 (仅第一个订阅者消费)</option> <option value="limited">限制消费次数</option> </select> </div> <div class="form-group"> <label for="globalMaxConsumption"><i class="fas fa-user-friends"></i> 默认最大消费次数</label> <input type="number" id="globalMaxConsumption" min="1" value="3"> </div> </div> <div class="config-box"> <div class="form-group"> <label for="globalHistoryEnabled"><i class="fas fa-history"></i> <input type="checkbox" id="globalHistoryEnabled" checked> 历史回放默认启用 </label> </div> <div class="form-group"> <label for="globalPublisherName"><i class="fas fa-user-tag"></i> 发布者默认名称</label> <input type="text" id="globalPublisherName" value="系统发布者"> </div> <button id="updateGlobalConfig" class="info"><i class="fas fa-sync-alt"></i> 更新全局配置</button> </div> </div> </div> <div class="dashboard"> <div class="card"> <div class="card-header"> <h2><i class="fas fa-paper-plane"></i> 消息发布控制台</h2> </div> <div class="form-group"> <label for="topic"><i class="fas fa-hashtag"></i> 主题名称</label> <input type="text" id="topic" placeholder="例如:news.sports" value="news.sports"> </div> <div class="form-group"> <label for="message"><i class="fas fa-envelope"></i> 消息内容</label> <textarea id="message" placeholder="输入要发布的消息内容...">NBA总决赛正在火热进行中,敬请关注!</textarea> </div> <div class="form-group"> <label for="broadcastMode"><i class="fas fa-bullhorn"></i> 广播模式</label> <select id="broadcastMode"> <option value="">使用全局默认</option> <option value="multicast">多播 (不限消费次数)</option> <option value="single">单播 (仅第一个订阅者消费)</option> <option value="limited">限制消费次数</option> </select> </div> <div id="maxConsumptionContainer" style="display: none;"> <label for="maxConsumption"><i class="fas fa-user-friends"></i> 最大消费次数</label> <input type="number" id="maxConsumption" min="1" value="3"> </div> <div class="form-group"> <label for="historyConfig"><i class="fas fa-history"></i> 历史记录配置 (覆盖全局)</label> <div class="actions"> <input type="number" id="historySize" min="0" max="50" placeholder="历史记录大小"> <input type="number" id="messageTimeout" min="0" placeholder="消息超时时间(ms)"> </div> </div> <div class="message-visual"> <div class="message-flow"></div> <div class="message-ball ball1">MSG</div> <div class="message-ball ball2">ACK</div> <div class="message-ball ball3">HIST</div> </div> <div class="actions"> <button id="publishBtn"><i class="fas fa-paper-plane"></i> 发布消息</button> <button id="publishNoSubBtn" class="warning"><i class="fas fa-user-slash"></i> 发布无订阅者消息</button> </div> <h3 style="margin-top: 25px; display: flex; align-items: center; gap: 10px;"> <i class="fas fa-file-alt"></i> 发布日志 </h3> <div class="logs" id="publishLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 发布系统准备就绪,请配置全局设置并发布消息 </div> </div> </div> <div class="card"> <div class="card-header"> <h2><i class="fas fa-inbox"></i> 消息订阅控制台</h2> </div> <div class="form-group"> <label for="subTopic"><i class="fas fa-hashtag"></i> 订阅主题</label> <input type="text" id="subTopic" placeholder="例如:news.sports" value="news.sports"> </div> <div class="form-group"> <label for="subscriberId"><i class="fas fa-id-card"></i> 订阅者ID</label> <input type="text" id="subscriberId" placeholder="输入自定义订阅者ID (可选)"> </div> <div class="form-group"> <label><i class="fas fa-history"></i> <input type="checkbox" id="replayHistory" checked> 接收历史消息 </label> </div> <button id="subscribeBtn" class="secondary"><i class="fas fa-plus-circle"></i> 创建订阅者</button> <div class="topic-info"> <div class="topic-card"> <h3><i class="fas fa-basketball-ball"></i> news.sports</h3> <div class="metrics"> <div class="metric-box"> <div class="metric-value sub-count" id="sportsSubCount">0</div> <div class="metric-label">活跃订阅者</div> </div> <div class="metric-box"> <div class="metric-value pending-count" id="sportsPendingCount">0</div> <div class="metric-label">等待消息</div> </div> </div> <button class="info" id="viewSportsDetails"><i class="fas fa-chart-bar"></i> 查看详情</button> </div> <div class="topic-card"> <h3><i class="fas fa-cloud-sun"></i> weather.update</h3> <div class="metrics"> <div class="metric-box"> <div class="metric-value sub-count" id="weatherSubCount">0</div> <div class="metric-label">活跃订阅者</div> </div> <div class="metric-box"> <div class="metric-value pending-count" id="weatherPendingCount">0</div> <div class="metric-label">等待消息</div> </div> </div> <button class="info" id="viewWeatherDetails"><i class="fas fa-chart-bar"></i> 查看详情</button> </div> </div> <h3 style="margin-top: 25px; display: flex; align-items: center; gap: 10px;"> <i class="fas fa-file-alt"></i> 订阅日志 </h3> <div class="logs" id="subscribeLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 订阅系统准备就绪,请创建订阅者 </div> </div> </div> </div> <div class="card"> <div class="card-header"> <h2><i class="fas fa-tasks"></i> 配置合并演示</h2> </div> <div class="demo-buttons"> <div class="demo-button" id="demoGlobalConfig"> <i class="fas fa-cogs"></i> <p>全局配置优先级</p> <small>无发布配置时使用全局配置</small> </div> <div class="demo-button" id="demoConfigMerge"> <i class="fas fa-layer-group"></i> <p>配置合并示例</p> <small>全局+发布配置=最终配置</small> </div> <div class="demo-button" id="demoLocalOverride"> <i class="fas fa-code-merge"></i> <p>局部覆盖示例</p> <small>发布配置覆盖全局配置</small> </div> <div class="demo-button" id="demoMessageLifetime"> <i class="fas fa-hourglass-half"></i> <p>消息生命周期</p> <small>从发布到消费全过程</small> </div> </div> </div> <div class="system-log"> <div class="system-log-header"> <i class="fas fa-terminal"></i> <h2>系统日志</h2> </div> <div class="logs" id="systemLog"> <div class="log-entry system"> <span class="log-time">[系统启动]</span> PubSub系统初始化完成,使用默认全局配置 </div> <div class="log-entry system"> <span class="log-time">[系统启动]</span> 全局配置: maxHistory=5, messageTimeout=15000ms, broadcastMode=multicast </div> </div> </div> <footer> <p>高级PubSub消息系统 | 支持全局配置和局部覆盖的强大消息中间件</p> <p>© 2023 消息系统实验室 | 设计:前端架构师团队</p> </footer> </div> <script src="pubsub.js"></script> <script src="app.js"></script> </body> </html>
css* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
:root {
--primary-color: #3498db;
--primary-light: #5dade2;
--primary-dark: #2980b9;
--secondary-color: #2ecc71;
--secondary-light: #58d68d;
--warning-color: #e74c3c;
--warning-light: #ec7063;
--dark-color: #2c3e50;
--light-color: #ecf0f1;
--gray-light: #bdc3c7;
--background-gradient: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%);
--card-bg: rgba(255, 255, 255, 0.97);
--shadow: 0 10px 30px rgba(0, 0, 0, 0.15);
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
line-height: 1.6;
color: #333;
background: var(--background-gradient);
padding: 20px;
min-height: 100vh;
position: relative;
}
body::before {
content: '';
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: radial-gradient(circle at center, transparent, rgba(0, 0, 0, 0.4));
z-index: -1;
}
.container {
max-width: 1400px;
margin: 0 auto;
position: relative;
}
header {
text-align: center;
padding: 30px 0;
margin-bottom: 30px;
color: white;
background: rgba(255, 255, 255, 0.1);
border-radius: 15px;
backdrop-filter: blur(10px);
box-shadow: var(--shadow);
position: relative;
overflow: hidden;
}
header::after {
content: '';
position: absolute;
top: 0;
left: 0;
right: 0;
height: 5px;
background: linear-gradient(90deg, var(--primary-color), var(--secondary-color));
}
h1 {
font-size: 3.2rem;
margin-bottom: 15px;
background: linear-gradient(to right, var(--primary-light), var(--secondary-light));
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);
position: relative;
padding-bottom: 15px;
}
h1::after {
content: '';
position: absolute;
bottom: 0;
left: 35%;
width: 30%;
height: 3px;
background: linear-gradient(90deg, transparent, var(--primary-light), transparent);
}
.subtitle {
color: var(--light-color);
font-size: 1.2rem;
max-width: 900px;
margin: 0 auto;
font-weight: 300;
line-height: 1.8;
}
.feature-badges {
display: flex;
justify-content: center;
flex-wrap: wrap;
gap: 15px;
margin-top: 25px;
}
.badge {
background: rgba(255, 255, 255, 0.15);
backdrop-filter: blur(5px);
padding: 10px 20px;
border-radius: 30px;
font-size: 0.95rem;
font-weight: 500;
display: flex;
align-items: center;
gap: 8px;
transition: transform 0.3s, background 0.3s;
}
.badge:hover {
transform: translateY(-3px);
background: rgba(255, 255, 255, 0.25);
}
.dashboard {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 25px;
margin-bottom: 30px;
}
@media (max-width: 1000px) {
.dashboard {
grid-template-columns: 1fr;
}
}
.card {
background: var(--card-bg);
border-radius: 15px;
padding: 30px;
box-shadow: var(--shadow);
transition: all 0.4s ease;
position: relative;
}
.card:hover {
transform: translateY(-8px);
box-shadow: 0 20px 40px rgba(0, 0, 0, 0.25);
}
.card-header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 25px;
padding-bottom: 20px;
border-bottom: 2px solid var(--primary-color);
}
.card-header h2 {
display: flex;
align-items: center;
gap: 15px;
color: var(--dark-color);
font-size: 1.8rem;
}
.card-header i {
font-size: 2rem;
color: var(--primary-color);
background: rgba(52, 152, 219, 0.1);
width: 60px;
height: 60px;
display: flex;
align-items: center;
justify-content: center;
border-radius: 50%;
}
.form-group {
margin-bottom: 22px;
}
.form-group label {
display: block;
margin-bottom: 12px;
font-weight: 600;
color: var(--dark-color);
font-size: 1.1rem;
display: flex;
align-items: center;
gap: 10px;
}
input,
select,
button,
textarea {
width: 100%;
padding: 15px 20px;
border: 1px solid var(--gray-light);
border-radius: 10px;
font-size: 16px;
background: #f9fafb;
transition: all 0.3s;
}
input:focus,
select:focus,
textarea:focus {
border-color: var(--primary-color);
box-shadow: 0 0 0 4px rgba(52, 152, 219, 0.25);
outline: none;
background: white;
}
textarea {
min-height: 120px;
resize: vertical;
}
button {
background: var(--primary-color);
color: white;
border: none;
cursor: pointer;
font-weight: 600;
letter-spacing: 0.5px;
transition: all 0.3s;
font-size: 1.1rem;
display: flex;
align-items: center;
justify-content: center;
gap: 12px;
padding: 18px;
border-radius: 12px;
}
button:hover {
background: var(--primary-dark);
transform: translateY(-3px);
box-shadow: 0 7px 15px rgba(52, 152, 219, 0.3);
}
button.secondary {
background: var(--secondary-color);
}
button.secondary:hover {
background: #27ae60;
box-shadow: 0 7px 15px rgba(46, 204, 113, 0.3);
}
button.warning {
background: var(--warning-color);
}
button.warning:hover {
background: #c0392b;
box-shadow: 0 7px 15px rgba(231, 76, 60, 0.3);
}
button.info {
background: #9b59b6;
}
button.info:hover {
background: #8e44ad;
box-shadow: 0 7px 15px rgba(155, 89, 182, 0.3);
}
.actions {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 18px;
}
.logs {
background: #f8fafc;
border: 1px solid #e1e4e8;
border-radius: 12px;
padding: 20px;
margin-top: 20px;
height: 300px;
overflow-y: auto;
font-family: 'Fira Code', 'Menlo', monospace;
font-size: 0.95rem;
box-shadow: inset 0 2px 10px rgba(0, 0, 0, 0.05);
}
.log-entry {
padding: 14px 18px;
border-radius: 10px;
margin-bottom: 12px;
display: flex;
animation: fadeIn 0.4s;
position: relative;
font-size: 0.9rem;
}
.log-entry:last-child {
margin-bottom: 0;
}
.log-time {
color: var(--dark-color);
margin-right: 15px;
font-size: 0.9rem;
min-width: 90px;
font-weight: 600;
flex-shrink: 0;
}
.log-entry.realtime {
background: rgba(52, 152, 219, 0.08);
border-left: 4px solid var(--primary-color);
}
.log-entry.history {
background: rgba(46, 204, 113, 0.08);
border-left: 4px solid var(--secondary-color);
}
.log-entry.warning {
background: rgba(231, 76, 60, 0.08);
border-left: 4px solid var(--warning-color);
}
.log-entry.ack {
background: rgba(155, 89, 182, 0.08);
border-left: 4px solid #9b59b6;
}
.log-entry.system {
background: rgba(41, 128, 185, 0.08);
border-left: 4px solid #2980b9;
}
.topic-info {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 20px;
margin-top: 25px;
}
.topic-card {
background: linear-gradient(135deg, rgba(255, 255, 255, 0.95), rgba(245, 247, 250, 0.95));
border-radius: 12px;
padding: 22px;
text-align: center;
box-shadow: 0 6px 18px rgba(0, 0, 0, 0.08);
transition: transform 0.4s;
position: relative;
overflow: hidden;
}
.topic-card::before {
content: '';
position: absolute;
top: 0;
left: 0;
right: 0;
height: 5px;
background: linear-gradient(90deg, var(--primary-color), var(--secondary-color));
}
.topic-card:hover {
transform: scale(1.03) rotate(1deg);
}
.topic-card h3 {
color: var(--dark-color);
margin-bottom: 20px;
font-size: 1.4rem;
display: flex;
align-items: center;
justify-content: center;
gap: 12px;
}
.metrics {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 18px;
margin: 20px 0;
}
.metric-box {
padding: 15px;
border-radius: 10px;
background: rgba(255, 255, 255, 0.8);
box-shadow: 0 4px 10px rgba(0, 0, 0, 0.05);
}
.metric-value {
font-size: 1.8rem;
font-weight: bold;
margin: 8px 0;
}
.sub-count {
color: var(--primary-dark);
}
.pending-count {
color: var(--secondary-color);
}
.metric-label {
font-size: 0.92rem;
color: var(--dark-color);
font-weight: 600;
}
.system-log {
margin-top: 40px;
padding: 30px;
background: var(--card-bg);
border-radius: 15px;
box-shadow: var(--shadow);
}
.system-log-header {
display: flex;
align-items: center;
gap: 15px;
margin-bottom: 25px;
padding-bottom: 15px;
border-bottom: 2px solid var(--primary-color);
}
.system-log-header i {
font-size: 2.2rem;
color: var(--primary-color);
}
.system-log-header h2 {
font-size: 1.8rem;
color: var(--dark-color);
}
footer {
text-align: center;
padding: 30px 0;
margin-top: 50px;
color: rgba(255, 255, 255, 0.85);
font-size: 0.95rem;
border-top: 1px solid rgba(255, 255, 255, 0.15);
}
footer p {
margin-bottom: 10px;
}
@keyframes fadeIn {
from {
opacity: 0;
transform: translateY(15px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.message-visual {
height: 160px;
background: rgba(255, 255, 255, 0.1);
border-radius: 15px;
margin: 25px 0;
position: relative;
overflow: hidden;
box-shadow: inset 0 0 20px rgba(0, 0, 0, 0.1);
}
.message-flow {
position: absolute;
width: 100%;
height: 40px;
top: 60px;
background: url('data:image/svg+xml;utf8,<svg xmlns="http://www.w3.org/2000/svg" width="200" height="40" viewBox="0 0 200 40"><path d="M10,20 Q100,0 190,20" fill="none" stroke="rgba(255,255,255,0.3)" stroke-width="3" stroke-dasharray="5,4"/></svg>');
background-repeat: repeat-x;
animation: flowMove 20s linear infinite;
}
@keyframes flowMove {
0% {
background-position: 0 0;
}
100% {
background-position: 200px 0;
}
}
.message-ball {
position: absolute;
width: 45px;
height: 45px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
color: white;
font-weight: bold;
font-size: 0.9rem;
top: 47px;
animation: moveBall 5s ease-in-out infinite;
box-shadow: 0 8px 20px rgba(0, 0, 0, 0.2);
z-index: 10;
}
.ball1 {
left: 0;
background: linear-gradient(135deg, var(--primary-color), var(--primary-dark));
animation-delay: 0s;
}
.ball2 {
left: 70px;
background: linear-gradient(135deg, var(--secondary-color), #27ae60);
animation-delay: 1s;
}
.ball3 {
left: 140px;
background: linear-gradient(135deg, var(--warning-light), var(--warning-color));
animation-delay: 2s;
}
@keyframes moveBall {
0% {
transform: translateX(0) translateY(0);
}
25% {
transform: translateX(250px) translateY(-20px);
}
50% {
transform: translateX(500px) translateY(0);
}
75% {
transform: translateX(750px) translateY(20px);
}
100% {
transform: translateX(1000px) translateY(0);
}
}
.demo-buttons {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(220px, 1fr));
gap: 22px;
margin-top: 35px;
}
.demo-button {
padding: 25px 20px;
text-align: center;
background: linear-gradient(135deg, #1e3c72, #2a5298);
color: white;
border-radius: 12px;
font-weight: 500;
cursor: pointer;
transition: all 0.4s;
box-shadow: 0 8px 20px rgba(0, 0, 0, 0.2);
position: relative;
overflow: hidden;
}
.demo-button::before {
content: '';
position: absolute;
top: -50%;
left: -50%;
width: 200%;
height: 200%;
background: linear-gradient(rgba(255, 255, 255, 0), rgba(255, 255, 255, 0.3), rgba(255, 255, 255, 0));
transform: rotate(45deg);
}
.demo-button:hover {
transform: translateY(-8px);
box-shadow: 0 15px 30px rgba(0, 0, 0, 0.3);
}
.demo-button i {
font-size: 2.4rem;
display: block;
margin-bottom: 12px;
}
.demo-button p {
font-size: 1.3rem;
margin-bottom: 8px;
}
.demo-button small {
font-size: 0.9rem;
opacity: 0.8;
}
.config-section {
background: var(--card-bg);
border-radius: 15px;
padding: 30px;
margin-bottom: 30px;
box-shadow: var(--shadow);
}
.config-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 20px;
}
.config-box {
padding: 20px;
background: rgba(52, 152, 219, 0.08);
border-radius: 10px;
}
#maxConsumptionContainer {
margin: 15px 0;
}
js// 创建PubSub实例
const pubsub = new PubSub();
// 日志记录函数
function logPublish (message, type = 'realtime') {
const now = new Date();
const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`;
const logEntry = document.createElement('div');
logEntry.className = `log-entry ${type}`;
logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`;
const publishLog = document.getElementById('publishLog');
publishLog.appendChild(logEntry);
publishLog.scrollTop = publishLog.scrollHeight;
}
function logSubscribe (message, type = 'history') {
const now = new Date();
const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`;
const logEntry = document.createElement('div');
logEntry.className = `log-entry ${type}`;
logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`;
const subscribeLog = document.getElementById('subscribeLog');
subscribeLog.appendChild(logEntry);
subscribeLog.scrollTop = subscribeLog.scrollHeight;
}
function logSystem (message) {
const now = new Date();
const timeString = `[${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')}:${now.getSeconds().toString().padStart(2, '0')}]`;
const logEntry = document.createElement('div');
logEntry.className = 'log-entry system';
logEntry.innerHTML = `<span class="log-time">${timeString}</span> ${message}`;
const systemLog = document.getElementById('systemLog');
systemLog.appendChild(logEntry);
systemLog.scrollTop = systemLog.scrollHeight;
}
// 更新主题信息
function updateTopicInfo () {
const sportsInfo = pubsub.getTopicInfo('news.sports') || { subscribers: 0, pendingMessages: 0 };
const weatherInfo = pubsub.getTopicInfo('weather.update') || { subscribers: 0, pendingMessages: 0 };
document.getElementById('sportsSubCount').textContent = sportsInfo.subscribers;
document.getElementById('sportsPendingCount').textContent = sportsInfo.pendingMessages;
document.getElementById('weatherSubCount').textContent = weatherInfo.subscribers;
document.getElementById('weatherPendingCount').textContent = weatherInfo.pendingMessages;
}
// DOM元素引用
const broadcastMode = document.getElementById('broadcastMode');
const maxConsumptionContainer = document.getElementById('maxConsumptionContainer');
const updateGlobalConfigBtn = document.getElementById('updateGlobalConfig');
// 广播模式切换
broadcastMode.addEventListener('change', () => {
maxConsumptionContainer.style.display =
broadcastMode.value === 'limited' ? 'block' : 'none';
});
// 更新全局配置
updateGlobalConfigBtn.addEventListener('click', () => {
const newConfig = {
maxHistory: parseInt(document.getElementById('globalMaxHistory').value) || 5,
messageTimeout: parseInt(document.getElementById('globalTimeout').value) || 15000,
broadcastMode: document.getElementById('globalBroadcastMode').value,
maxConsumption: document.getElementById('globalBroadcastMode').value === 'limited' ?
parseInt(document.getElementById('globalMaxConsumption').value) || 3 : null,
historyEnabled: document.getElementById('globalHistoryEnabled').checked,
publisherName: document.getElementById('globalPublisherName').value || '系统发布者'
};
pubsub.updateGlobalConfig(newConfig);
logSystem(`全局配置已更新:
maxHistory=${newConfig.maxHistory},
timeout=${newConfig.messageTimeout}ms,
broadcastMode=${newConfig.broadcastMode}`);
});
// 消息发布
document.getElementById('publishBtn').addEventListener('click', () => {
const topic = document.getElementById('topic').value;
const message = document.getElementById('message').value;
// 获取本地配置
const localConfig = {
broadcastMode: broadcastMode.value === '' ?
pubsub.globalConfig.broadcastMode : broadcastMode.value
};
// 如果有覆盖配置,优先使用
if (document.getElementById('historySize').value) {
localConfig.maxHistory = parseInt(document.getElementById('historySize').value);
}
if (document.getElementById('messageTimeout').value) {
localConfig.messageTimeout = parseInt(document.getElementById('messageTimeout').value);
}
// 发布消息
const messageId = pubsub.publish(topic, message, localConfig, (ackInfo) => {
const type = ackInfo.status === 'timeout' ? 'warning' : 'ack';
logPublish(`${type === 'warning' ? '⏰ 消息超时' : '✅ 已消费'}:
${ackInfo.message.substring(0, 30)}...
(订阅者: ${ackInfo.subscriberId || '无'})`, type);
});
logPublish(`发布消息成功: ${message.substring(0, 30)}... (ID: ${messageId})`);
updateTopicInfo();
});
// 无订阅者消息发布
document.getElementById('publishNoSubBtn').addEventListener('click', () => {
const topic = 'weather.update';
const message = '今日天气: 晴天, 气温25℃';
document.getElementById('topic').value = topic;
document.getElementById('message').value = message;
const messageId = pubsub.publish(topic, message);
logPublish(`发布消息到无订阅者主题: ${message.substring(0, 30)}... (ID: ${messageId})`, 'warning');
updateTopicInfo();
});
// 创建订阅者
document.getElementById('subscribeBtn').addEventListener('click', () => {
const topic = document.getElementById('subTopic').value;
const subscriberId = document.getElementById('subscriberId').value || undefined;
const replayHistory = document.getElementById('replayHistory').checked;
const subscriptionId = pubsub.subscribe(
topic,
(message, ack, meta) => {
const type = meta.isHistory ? 'history' : 'realtime';
logSubscribe(
`${meta.isHistory ? '🕰️ 历史消息' : '⚡ 实时消息'}:
${message.substring(0, 30)}...
(来自: ${meta.publisher}, ID: ${meta.id})`,
type
);
ack('成功处理');
},
{
subscriberId,
replayHistory
}
);
logSubscribe(`订阅者创建成功: ${subscriberId || subscriptionId} (主题: ${topic})`, 'system');
updateTopicInfo();
});
// 演示功能
document.getElementById('demoGlobalConfig').addEventListener('click', () => {
// 设置全局配置
pubsub.updateGlobalConfig({
maxHistory: 3,
messageTimeout: 10000
});
// 发布消息(使用全局配置)
pubsub.publish('demo.topic', '使用全局配置的消息', {});
logSystem('演示: 已设置全局配置 maxHistory=3, timeout=10000ms');
logSystem('发布消息: 使用全局配置的消息');
});
document.getElementById('demoConfigMerge').addEventListener('click', () => {
// 发布消息(使用全局和局部配置)
pubsub.publish(
'demo.topic',
'合并配置的消息',
{
maxHistory: 2,
publisherName: '特殊发布者'
}
);
logSystem('演示: 全局配置 + 发布配置 = 最终配置');
logSystem('发布消息: 合并配置的消息 (覆盖maxHistory=2)');
});
document.getElementById('demoLocalOverride').addEventListener('click', () => {
// 覆盖全局配置
pubsub.updateGlobalConfig({
broadcastMode: 'single',
maxHistory: 5
});
// 发布消息(使用局部配置覆盖)
pubsub.publish(
'demo.topic',
'局部覆盖配置的消息',
{
broadcastMode: 'multicast',
publisherName: '测试发布者'
}
);
logSystem('演示: 局部配置覆盖全局配置');
logSystem('发布消息: 局部覆盖配置的消息 (覆盖broadcastMode=multicast)');
});
document.getElementById('demoMessageLifetime').addEventListener('click', () => {
// 更新全局配置
pubsub.updateGlobalConfig({
maxHistory: 3,
messageTimeout: 15000,
broadcastMode: 'multicast'
});
// 发布几条消息
pubsub.publish('demo.lifecycle', '生命周期消息1');
pubsub.publish('demo.lifecycle', '生命周期消息2');
pubsub.publish('demo.lifecycle', '生命周期消息3');
// 创建订阅者
pubsub.subscribe('demo.lifecycle', (msg, ack) => {
logSubscribe(`消费消息: ${msg.substring(0, 20)}...`, 'realtime');
ack('ACK确认');
});
logSystem('演示: 消息生命周期演示 (发布3条消息)');
logSystem('创建订阅者消费历史消息');
});
// 页面加载时初始化
document.addEventListener('DOMContentLoaded', () => {
logSystem('系统启动: PubSub消息系统已就绪');
logSystem('默认全局配置: maxHistory=5, messageTimeout=15000, broadcastMode=multicast');
updateTopicInfo();
// 设置初始全局配置值
document.getElementById('globalMaxHistory').value = 5;
document.getElementById('globalTimeout').value = 15000;
document.getElementById('globalPublisherName').value = '系统发布者';
});
tsimport { AckParamProp, DeepPartial, GlobalOPtionProp, MessageMetaProp, PublishOptionProp, SubscribeOptionProp, TopicMapSubscriberMapValue, TopicMapValue } from "./type";
const TIMEOUT_STATUS = "timeout"
const CONSUMED_STATUS = "consumed"
export class PubSub {
messageCounter: number = 0
private globalConfig: GlobalOPtionProp
/** 存储主题 */
private topics: Map<string, TopicMapValue>
private ackCallbacks: Map<string, (params: AckParamProp) => void>
private subscribers: Map<string, any>
constructor(options: Partial<GlobalOPtionProp> = {}) {
// 默认全局配置
const defaultConfig: GlobalOPtionProp = {
maxHistory: 5,
messageTimeout: 15000,
broadcastMode: 'multicast',
maxConsumption: null,
historyEnabled: true,
publisherName: '系统发布者'
};
// 合并用户传入的全局配置
this.globalConfig = { ...defaultConfig, ...options };
this.topics = new Map();
this.ackCallbacks = new Map();
this.subscribers = new Map();
this.messageCounter = 0;
}
/**
* 更新全局配置
* @param {Object} newConfig 新的全局配置
*/
updateGlobalConfig(newConfig: Partial<GlobalOPtionProp>) {
this.globalConfig = { ...this.globalConfig, ...newConfig };
}
/**
* 订阅主题
* @param {string} topic 主题名称
* @param {Function} callback 回调函数
* @param {Object} options 选项
*/
subscribe(topic, callback, options: Partial<SubscribeOptionProp> = {}) {
debugger
if (!this.topics.has(topic)) {
this.topics.set(topic, {
subscribers: new Map(),
history: [],
config: { ...this.globalConfig }
});
}
const topicInfo = this.topics.get(topic);
const subscriberId = options.subscriberId || `sub-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// 保存订阅者
topicInfo.subscribers.set(subscriberId, {
callback,
options: {
replayHistory: this.globalConfig.historyEnabled,
...options
}
});
// 存储全局订阅者引用
this.subscribers.set(subscriberId, { topic, callback });
// 回放历史消息
if (topicInfo.subscribers.size === 1 && topicInfo.history.length > 0) {
this._replayHistory(topic, subscriberId);
}
return subscriberId;
}
/**
* 发布消息
* @param {string} topic 主题名称
* @param {*} message 消息内容
* @param {Object} localOptions 局部选项(可选)
*/
publish(topic: string, message: unknown, localOptions: Partial<PublishOptionProp> = {}) {
debugger
const { ackCallback } = localOptions
if (!this.topics.has(topic)) {
this.topics.set(topic, {
subscribers: new Map(),
history: [],
config: { ...this.globalConfig }
});
}
const topicInfo = this.topics.get(topic)
const messageId = `msg-${this.messageCounter++}`;
const timestamp = Date.now();
// 合并配置:全局配置 + 发布时局部配置
const mergedOptions = {
...this.globalConfig,
...localOptions,
publisherName: localOptions.publisherName || this.globalConfig.publisherName
};
const messageData: MessageMetaProp = {
id: messageId,
topic,
message,
timestamp,
publisher: mergedOptions.publisherName,
options: mergedOptions,
consumedCount: 0,
consumers: [],
consumed: false,// 是否已消费
};
// 存储ACK回调
if (typeof ackCallback === 'function') {
this.ackCallbacks.set(messageId, ackCallback);
}
// 如果有订阅者,立即投递
if (topicInfo.subscribers.size > 0) {
this._deliverMessage(topic, messageData);
} else {
// 存储为历史消息
this._addToHistory(topicInfo, messageData);
}
return messageId;
}
/**
* 取消订阅
* @param {string} topic 主题名称
* @param {string} subscriberId 订阅者ID
*/
unsubscribe(topic, subscriberId) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
if (topicInfo.subscribers.has(subscriberId)) {
topicInfo.subscribers.delete(subscriberId);
this.subscribers.delete(subscriberId);
}
}
/**
* 清除主题历史记录
* @param {string} topic 主题名称
*/
clearHistory(topic) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
topicInfo.history.forEach(msg => {
if (msg.timeoutId) clearTimeout(msg.timeoutId);
});
topicInfo.history = [];
}
/**
* 获取主题信息
* @param {string} topic 主题名称
*/
getTopicInfo(topic) {
if (!this.topics.has(topic)) return null;
const topicInfo = this.topics.get(topic);
return {
subscribers: topicInfo.subscribers.size,
pendingMessages: topicInfo.history.length
};
}
/**
* 重置系统
*/
reset() {
this.topics.clear();
this.ackCallbacks.clear();
this.subscribers.clear();
this.messageCounter = 0;
}
/**
* 添加到历史记录
* @param {Object} topicInfo 主题信息
* @param {Object} messageData 消息数据
*/
private _addToHistory(topicInfo: TopicMapValue, messageData: MessageMetaProp) {
const { history } = topicInfo;
const maxHistory = messageData.options.maxHistory || 5;
if (messageData.options.messageTimeout > 0) {
messageData.timeoutId = window.setTimeout(() => {
if (this.ackCallbacks.has(messageData.id)) {
const ackInfo: AckParamProp = {
status: TIMEOUT_STATUS,
messageId: messageData.id,
message: messageData.message,
topic: messageData.topic,
publishTime: messageData.timestamp,
timeoutTime: Date.now()
};
this.ackCallbacks.get(messageData.id)(ackInfo);
const index = history.findIndex(msg => msg.id === messageData.id);
if (index !== -1) {
history.splice(index, 1);
}
this.ackCallbacks.delete(messageData.id);
}
}, messageData.options.messageTimeout);
}
if (history.length >= maxHistory) {
const removed = history.shift();
//if (removed?.timeoutId) clearTimeout(removed.timeoutId);
if(removed.timeoutId){
clearTimeout(removed.timeoutId)
}
}
history.push(messageData);
}
/**
* 投递消息给订阅者
* @param {string} topic 主题名称
* @param {MessageMetaProp} messageData 消息数据
*/
private _deliverMessage(topic: string, messageData: MessageMetaProp) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
const { subscribers } = topicInfo;
const { broadcastMode, maxConsumption } = messageData.options;
// 单播模式 一条消息只能被一个订阅者消费
if (broadcastMode === 'single') {
const firstSubscriber = subscribers.entries().next().value;
if (firstSubscriber) {
const [subscriberId, subscriber] = firstSubscriber;
this._deliverToSubscriber(subscriberId, subscriber, messageData);
}
return;
}
// 剩余消费次数
const remainingConsumption = maxConsumption === null ? Infinity : maxConsumption - messageData.consumedCount;
let count = 0;
for (const [subscriberId, subscriber] of subscribers) {
if (count >= remainingConsumption) break;
if (messageData.consumers.includes(subscriberId)) continue;
this._deliverToSubscriber(subscriberId, subscriber, messageData);
messageData.consumers.push(subscriberId);
count++;
messageData.consumedCount++;
if (messageData.consumedCount >= remainingConsumption) break;
}
}
/**
* 投递给指定订阅者
* @param {string} subscriberId 订阅者ID
* @param {Object} subscriber 订阅者
* @param {Object} messageData 消息数据
*/
private _deliverToSubscriber(subscriberId: string, subscriber: DeepPartial<TopicMapSubscriberMapValue>, messageData: MessageMetaProp) {
const ack = (result) => {
const ackInfo = {
subscriberId,
status: CONSUMED_STATUS,
messageId: messageData.id,
topic: messageData.topic,
message: messageData.message,
publishTime: messageData.timestamp,
consumeTime: Date.now(),
result
};
if (this.ackCallbacks.has(messageData.id)) {
this.ackCallbacks.get(messageData.id)(ackInfo);
}
};
try {
//subscriber.callback(messageData.message, ack, messageData);
const p = subscriber.callback(messageData.message, messageData)
Promise.resolve(p).then((res:any) => {
ack(res)
}).catch(e => {
ack({ error: e});
})
} catch (error) {
ack({ error: error.message });
}
}
/**
* 回放历史消息给订阅者
* @param {string} topic 主题名称
* @param {string} subscriberId 订阅者ID
*/
private _replayHistory(topic: string, subscriberId: string) {
const topicInfo = this.topics.get(topic);
if (!topicInfo) return;
const subscriber = topicInfo.subscribers.get(subscriberId);
if (!subscriber) return;
topicInfo.history.forEach(messageData => {
const ack = (result = {}) => {
const ackInfo = {
subscriberId,
status: CONSUMED_STATUS,
messageId: messageData.id,
message: messageData.message,
topic,
publishTime: messageData.timestamp,
consumeTime: Date.now(),
result,
isHistory: true
};
if (this.ackCallbacks.has(messageData.id)) {
this.ackCallbacks.get(messageData.id)(ackInfo);
this.ackCallbacks.delete(messageData.id);
}
};
try {
const p = subscriber.callback(messageData.message, {
...messageData,
isHistory: true
})
Promise.resolve(p).then((res:any) => {
ack(res)
}).catch(e => {
ack({ error: e});
})
} catch (error) {
ack({ error: error.message });
}
});
}
}
type.ts
tsexport type DeepPartial<T> = T extends Function
? T | undefined
: T extends Date
? T | undefined
: T extends (infer U)[]
? U[] | undefined
: T extends object
? { [K in keyof T]?: DeepPartial<T[K]> }
: T | undefined;
export type EventType = string | symbol;
export interface GlobalOPtionProp {
maxHistory: number,
/** 消息超时时间,生命有效期,单位毫秒 */
messageTimeout: number,
/** 单播或多播 */
broadcastMode: "multicast" | "single",
/** 被多少个订阅者消费 */
maxConsumption: null | number,
/** 是否开启历史回放 */
historyEnabled: boolean
/** 消息发布者名称 */
publisherName: string
}
export interface PublishOptionProp extends GlobalOPtionProp {
ackCallback: (params:AckParamProp) => void
}
export interface SubscribeOptionProp {
/** 订阅者id */
subscriberId: string
}
export type SubscribeCallback = (message:unknown,meta: MessageMetaProp) => Promise<void>
export interface TopicMapSubscriberMapValue {
callback: SubscribeCallback,
options: {
replayHistory: GlobalOPtionProp["historyEnabled"],
[K: string]: any,
}
}
export interface TopicMapValue {
subscribers: Map<string, DeepPartial<TopicMapSubscriberMapValue>>,
history: Array<MessageMetaProp>,
config: GlobalOPtionProp,
}
// 消息元数据,除了publish传递的消息还包括额外附加消息
export interface MessageMetaProp {
id: string,
topic: string,
message: unknown,
/** 发布时间戳 */
timestamp: number,
publisher: string,
options: DeepPartial<PublishOptionProp & GlobalOPtionProp>,
/** 消息的消费次数 */
consumedCount: number,
consumers: Array<string>,
consumed: boolean,// 是否已消费
timeoutId?:number,
isHistory?: boolean,
[T:string]:any
}
export interface AckParamProp{
status: string,
messageId: string,
message: unknown,
topic: string,
publishTime: number,
timeoutTime?: number,
consumeTime?: number,
subscriberId?: string,
result?: unknown
}
测试
tsfunction PubSubTest() {
const ps = new PubSub({
messageTimeout: 150000
})
// 先发布再订阅(已发布的历史消息只能被一个消费者消费,谁先订阅谁先消费)
const f1 = () => {
ps.publish("topic-1", "消息-1", {
ackCallback: (data) => {
console.log("topic-1 消息-1 ackCallback:", data,ps)
}
})
// 3秒后有人订阅
setTimeout(() => {
console.log("订阅者1,2订阅")
ps.subscribe("topic-1", (msg, meta) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve("我是订阅者1")
}, 2000);
})
},{
subscriberId: "sid-1"
})
ps.subscribe("topic-1", (msg, meta) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve("我是订阅者2")
}, 4000);
})
},{
subscriberId: "sid-2"
})
}, 3000);
}
// 先订阅再发布
const f2 = () => {
console.log("订阅者1订阅 topic-2 ")
ps.subscribe("topic-2", (msg, meta) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve("我是订阅者1")
}, 2000);
})
})
console.log("订阅者2订阅 topic-2 ")
ps.subscribe("topic-2", (msg, meta) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve("我是订阅者2")
}, 4000);
})
})
setTimeout(() => {
console.log("topic-2 发布消息:")
ps.publish("topic-2", "消息-1", {
ackCallback: (data) => {
console.log("topic-2 消息-1 ackCallback:", data)
//console.log(ps)
}
})
}, 3000);
}
f1()
//f2()
}


本文作者:繁星
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!