建议先看一下上篇 观察者模式 ,发布订阅模式和观察者模式本质上还是一样的,并且发布订阅模式也没有在经典的设计模式书 GoF 中出现,很多地方也直接把两者看成一种设计模式了。

GoF 的名字也有个有趣的故事,这里 贴过来:

The authors of the DesignPatternsBook came to be known as the "Gang of Four." The name of the book ("Design Patterns: Elements of Reusable Object-Oriented Software") is too long for e-mail, so "book by the gang of four" became a shorthand name for it. After all, it isn't the ONLY book on patterns. That got shortened to "GOF book", which is pretty cryptic the first time you hear it.

场景

假设我们在开发一款外卖网站,进入网站的时候,第一步需要去请求后端接口得到用户的常用外卖地址。然后再去请求其他接口、渲染页面。如果使用了观察者模式可能会这样写:

1// 页面里有三个模块 A,B,C 需要拿到地址后再进行下一步
2// A、B、C 三个模块都是不同人写的,提供了不同的方法供我们调用
3const observers = []
4// 注册观察者
5observers.push(A.update)
6observers.push(B.next)
7obervers.push(C.change)
8
9// getAddress 异步请求
10getAddress().then((res) => {
11  const address = res.address
12  observers.forEach(update => update(address))
13})

getAddress 模块和其他 ABC 三个模块已经实现了解耦,但仍需要维护 observers 这个数组来注册观察者,同时还需要知道各个模块提供了什么方法用于回调。

我们可以使用发布订阅模式,让 getAddress 模块和其他 ABC 三个模块解耦的更加彻底。

发布订阅模式

回忆一下观察者模式:

The observer pattern is a software design pattern in which an object, named the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

观察者模式中,Subject 自己维护观察者列表进行注册和通知。

image-20220130170413954

发布订阅模式的话,引入一个中间平台进行注册和通知,相当于从 Subject 中解耦出来。

image-20220130171806687

观察者通过 onEventBus 注册事件,然后 Subject 通过 emitEventBus 发射事件,由 EventBus 来向观察者更新。

接下来实现一个简单的 EventBus

1// event.js
2const observersMap = {}
3const listen = function (key, fn) {
4  if (!observersMap[key])
5    observersMap[key] = []
6
7  observersMap[key].push(fn)
8}
9const trigger = function () {
10  const key = Array.prototype.shift.call(arguments)
11  const fns = observersMap[key]
12  if (!fns || fns.length === 0)
13    return false
14
15  for (let i = 0, fn; (fn = fns[i]); i++)
16    fn.apply(this, arguments)
17}
18const remove = function (key, fn) {
19  const fns = observersMap[key]
20  if (!fns)
21    return false
22
23  if (!fn) {
24    fns && (fns.length = 0) // 全部清空
25  }
26  else {
27    let findIndex = -1
28    for (let i = 0; i < fns.length; i++) {
29      if (fns[i] === fn) {
30        findIndex = i
31        break
32      }
33    }
34    if (findIndex !== -1)
35      fns.splice(findIndex, 1)
36  }
37}
38
39// 同一种功能可能会见到不同名字,这里都导出去
40export const EventBus = {
41  listen,
42  attach: listen,
43  on: listen,
44
45  remove,
46  detach: remove,
47
48  trigger,
49  emit: trigger,
50}

我们通过 observersMap 将不同的事件保存为不同的数组,emit 的时候得到对应的数组去调用即可。看下例子:

1import { EventBus } from './event.js'
2
3const WindLiang = {
4  writePost(p) {
5    EventBus.emit('windliang', p)
6  },
7}
8
9const XiaoMing = {
10  update(post) {
11    console.log(`我收到了${post} 并且点了个赞`)
12  },
13}
14
15const XiaoYang = {
16  update(post) {
17    console.log(`我收到了${post} 并且转发了`)
18  },
19}
20
21const XiaoGang = {
22  update(post) {
23    console.log(`我收到了${post} 并且收藏`)
24  },
25}
26
27EventBus.on('windliang', XiaoMing.update)
28EventBus.on('windliang', XiaoYang.update)
29EventBus.on('windliang', XiaoGang.update)
30
31WindLiang.writePost('新文章-观察者模式,balabala')

代码实现

让我们改造下开头写的观察者模式的代码:

地址模块:

1import { EventBus } from './event.js'
2
3// getAddress 异步请求
4getAddress().then((res) => {
5  const address = res.address
6  EventBus.emit('ADDRESS', address)
7})

A 模块

1import { EventBus } from './event.js'
2
3function update(address) {
4  // 自己的逻辑
5}
6
7EventBus.on('ADDRESS', address => update(address))

B 模块

1import { EventBus } from './event.js'
2
3function next(address) {
4  // 自己的逻辑
5}
6
7EventBus.on('ADDRESS', address => next(address))

C 模块

1import { EventBus } from './event.js'
2
3function change(address) {
4  // 自己的逻辑
5}
6
7EventBus.on('ADDRESS', address => change(address))

可以看到 getAddress 模块不再需要关心观察者有谁,它只需要向 EventBus 发射更新事件即可。

每个模块内部自己如果需要地址信息,只需要订阅相关事件,然后传入回调函数即可。

特殊情况

实际工程中可能遇到一些特殊场景,由于 emit 一般在一个异步事件中执行,如果这个异步事件突然执行的变快了,就可能造成某个事件先 emit 了,然后某个模块才进行了 on

此时我们可以对 EventBus 进行一定的改写,使得先订阅事件,后触发事件成为可能。

为了不改动原有逻辑,我们可以通过 代理模式 进行改写。

1// eventProxy.js
2
3import { EventBus as EventBusOriginal } from './event.js'
4
5let offlineStack = [] // listen 之前的 emit 事件进行缓存
6
7const triggerProxy = function () {
8  const _self = this
9  const args = arguments
10  const fn = function () {
11    return EventBusOriginal.trigger.apply(_self, args)
12  }
13  if (offlineStack)
14    return offlineStack.push(fn)
15
16  return fn()
17}
18const listenProxy = function (key, fn) {
19  EventBusOriginal.listen(key, fn)
20  if (!offlineStack)
21    return
22
23  for (let i = 0, fn; (fn = offlineStack[i]); i++)
24    fn()
25
26  offlineStack = null
27}
28
29const listen = listenProxy || EventBus.listen
30const trigger = triggerProxy || EventBus.trigger
31
32export const EventBus = {
33  ...EventBusOriginal,
34  listen,
35  attach: listen,
36  on: listen,
37
38  trigger,
39  emit: trigger,
40}

trigger 的时候,如果 offlineStack 不为 null,说明还没有调用过 listen,此时将当前事件保存起来。

listen 的时候遍历之前保存的事件,并且将 offlineStack 置为 null,表示已经调用过 listen 了。

看一下效果:

1import { EventBus } from './eventProxy.mjs'
2
3const WindLiang = {
4  writePost(p) {
5    EventBus.emit('windliang', p)
6  },
7}
8
9const XiaoMing = {
10  update(post) {
11    console.log(`我收到了${post} 并且点了个赞`)
12  },
13}
14
15const XiaoYang = {
16  update(post) {
17    console.log(`我收到了${post} 并且转发了`)
18  },
19}
20
21const XiaoGang = {
22  update(post) {
23    console.log(`我收到了${post} 并且收藏`)
24  },
25}
26
27WindLiang.writePost('新文章-观察者模式,balabala')
28
29EventBus.on('windliang', XiaoYang.update) // 我收到了新文章-观察者模式,balabala 并且转发了

虽然是先进行的 emit 后进行的 on 的,但依旧会正常执行。

上边的解决方案很粗略,只适用于有一个事件并且只有一个 on 的场景,不然的话比如下边的情况:

1WindLiang.writePost('新文章-观察者模式,balabala')
2
3EventBus.on('windliang', XiaoMing.update)
4EventBus.on('windliang', XiaoYang.update)
5EventBus.on('windliang', XiaoGang.update)

只有 XiaoMing.update 会执行,后边两句就会错过第一次的 emit ,因为执行一次 listen 就把缓存清空了。

或者在 writePost 之前有了一次 on 了:

1EventBus.on('windliang', XiaoMing.update)
2WindLiang.writePost('新文章-观察者模式,balabala')
3
4EventBus.on('windliang', XiaoYang.update)
5EventBus.on('windliang', XiaoGang.update)

同样只有 XiaoMing.update 会执行,后边两句就会错过第一次的 emit 了,因为执行一次 listen 就把缓存清空了。

对于实际场景,我们还需要根据情况继续进行调整。

发布订阅模式相对于最原始的观察者模式将 SubjectObservers 进行了彻底解耦,Subject 不再需要关心谁订阅了它,Observer 只需要在自己内部订阅它所关心的事件即可。

通过封装好的 EventBus 也实现了更好的复用,不需要每个模块都去维护自己的观察者列表。

但同时也带来了坏处,所有的事件订阅分散在各个模块,没有一个全局视角知道某个事件被哪些模块订阅了,可能会导致程序难以理解和调试。