0%

深入剖析WebRTC之事件机制Slot

我最早了解到 sigslot 大概是在 2007年 左右,当时在QT中大量使用了 sigslot 的概念。 现在 WebRTC 中也大量使用了 sigslot 这种机制来处理底层的事件。它对我们阅读WebRTC代码至关重要。本篇文章就详细介绍一下 sigslot。

Sigslot作用

Sigslot 的作用一句话表式就是为了解耦。例如,有两个类 A 和 B,如果 B 使用 A, 就必须在 B 类中写入与 A 类有关的代码。看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class A {
public:
void funcA();
}

class B {
public:
B(A& a){
m_a = a;
}

void funcB(){
m_a.funcA(); //这里调用了A类的方法
}

private:
A m_a; //引用 A 类型成员变量。
}

void main(int argc, char *argv[]){
A a;
B b(a);
b.funcB();
}

这里的弊端是 B 中必须要声名使用 A。如果我们的项目特别复杂,这样的使用方式在后期维护时很容易让我们掉入“陷阱”。有没有一种通用的办法可以做到在 B 中不用使用 A 也可以调用 A 中的方法呢?答案就是使用 sigslot。我们看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class A : public sigslot::has_slot<>
{
public:
void funcA();
};

class B
{
public:
sigslot::signal0<> sender;
};

void main(int argc, char *argv[]){

A a;
B b;

//在运行时才将 a 和 b 绑定到一起
b.sender.connect(&a, &A::funcA);
b.sender();

}

通过上面的代码我们可以看到 B 中没有一行与 A 相关的代码。只在 main 函数中(也就是在运行时)才知道 A 与 B 有关联关系。是不是觉得很神奇呢?下面我们就看一下它的实现原理。

实现原理

sigslot的原理其实非常简单,它就是一个变化的观察者模式。观察者模式如下所示:

观察者模式,首先让 Observer(“观察者”)对象 注册到 Subject(“被观察者”) 对象中。当 Subject 状态发生变化时,遍历所有注册到自己的 Observer 对象,并调用它们的 notify方法。

sigslot与观察者模式类似,它使用signal(“信号”)和slot(“槽”),区别在于 signal 主动连接自己感兴趣的类及其方法,将它们保存到自己的列表中。当发射信号时,它遍历所有的连接,调用 slot(“槽”) 方法。

如何使用

下面我们看一下 WebRTC 中是如何使用 sigslot 的。

  • 首先,定义 slot(“槽”),也就是事件处理函数。在WebRTC中定义槽必须继承 has_slots<>。如下图所示:

  • 其次,定义 signal (“信号”) ,也就是发送的信号。

    1
    2
    sigslot::signal1<AsyncSocket*,
    sigslot::multi_threaded_local> SignalWriteEvent;
  • 然后,将 signal 与 slot 连接到一起。在这里就是将 AsyncUDPSocket和 OnWriteEvent方法与signal绑定到一起。

    1
    2
    socket_->SignalWriteEvent.connect(this,
    &AsyncUDPSocket::OnWriteEvent);
  • 最后,发送信号。在 WebRTC中根据参数的不同定义了许多 signal,如 signal1 说明带一个参数,signal2说明带两个参数。

    1
    SignalWriteEvent(this);

关键代码

下面是对 sigslog 的类关系图及关键代码与其详细注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
...

// On our copy of sigslot.h, we set single threading as default.
#define SIGSLOT_DEFAULT_MT_POLICY single_threaded

#if defined(SIGSLOT_PURE_ISO) || \
(!defined(WEBRTC_WIN) && !defined(__GNUG__) && \
!defined(SIGSLOT_USE_POSIX_THREADS))
#define _SIGSLOT_SINGLE_THREADED
#elif defined(WEBRTC_WIN)
#define _SIGSLOT_HAS_WIN32_THREADS
#if !defined(WIN32_LEAN_AND_MEAN)
#define WIN32_LEAN_AND_MEAN
#endif
#include "webrtc/rtc_base/win32.h"
#elif defined(__GNUG__) || defined(SIGSLOT_USE_POSIX_THREADS)
#define _SIGSLOT_HAS_POSIX_THREADS
#include <pthread.h>
#else
#define _SIGSLOT_SINGLE_THREADED
#endif

#ifndef SIGSLOT_DEFAULT_MT_POLICY
#ifdef _SIGSLOT_SINGLE_THREADED
#define SIGSLOT_DEFAULT_MT_POLICY single_threaded
#else
#define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local
#endif
#endif

// TODO: change this namespace to rtc?
namespace sigslot {

...

//这面这大段代码是为了实现智能锁使用的。
//它会根据不同的平台初始化不同的互斥量,并调用不同的锁函数。

//如果是 Window 平台
#ifdef _SIGSLOT_HAS_WIN32_THREADS
// The multi threading policies only get compiled in if they are enabled.

//如果是全局线程
class multi_threaded_global {
public:
multi_threaded_global() {
static bool isinitialised = false;

if (!isinitialised) {
InitializeCriticalSection(get_critsec());
isinitialised = true;
}
}

void lock() { EnterCriticalSection(get_critsec()); }

void unlock() { LeaveCriticalSection(get_critsec()); }

private:
CRITICAL_SECTION* get_critsec() {
static CRITICAL_SECTION g_critsec;
return &g_critsec;
}
};

//如果是本地线程
class multi_threaded_local {
public:
multi_threaded_local() { InitializeCriticalSection(&m_critsec); }

multi_threaded_local(const multi_threaded_local&) {
InitializeCriticalSection(&m_critsec);
}

~multi_threaded_local() { DeleteCriticalSection(&m_critsec); }

void lock() { EnterCriticalSection(&m_critsec); }

void unlock() { LeaveCriticalSection(&m_critsec); }

private:
CRITICAL_SECTION m_critsec;
};
#endif // _SIGSLOT_HAS_WIN32_THREADS

//非window平台
#ifdef _SIGSLOT_HAS_POSIX_THREADS
// The multi threading policies only get compiled in if they are enabled.

//如果是全局线程
class multi_threaded_global {
public:
void lock() { pthread_mutex_lock(get_mutex()); }
void unlock() { pthread_mutex_unlock(get_mutex()); }

private:
static pthread_mutex_t* get_mutex();
};

//如果是本地线程
class multi_threaded_local {
public:
multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); }
multi_threaded_local(const multi_threaded_local&) {
pthread_mutex_init(&m_mutex, nullptr);
}
~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); }
void lock() { pthread_mutex_lock(&m_mutex); }
void unlock() { pthread_mutex_unlock(&m_mutex); }

private:
pthread_mutex_t m_mutex;
};
#endif // _SIGSLOT_HAS_POSIX_THREADS

//根据不同的策略调用不同的锁。
//这里的策略就是不同的平台
template <class mt_policy>
class lock_block {
public:
mt_policy* m_mutex;

lock_block(mt_policy* mtx) : m_mutex(mtx) { m_mutex->lock(); }

~lock_block() { m_mutex->unlock(); }
};

class _signal_base_interface;

class has_slots_interface {

...

public:
void signal_connect(_signal_base_interface* sender) {
...
}

void signal_disconnect(_signal_base_interface* sender) {
...
}

void disconnect_all() { ... }
};

class _signal_base_interface {
...

public:
void slot_disconnect(has_slots_interface* pslot) {
...
}

void slot_duplicate(const has_slots_interface* poldslot,
has_slots_interface* pnewslot) {
...
}
};

// 该类是一个特别重要的类
// signal与slot绑定之前,必须先将槽对象与槽方法组成 connection
//
class _opaque_connection {
private:
typedef void (*emit_t)(const _opaque_connection*);

//联合结构体,用于函数转换
template <typename FromT, typename ToT>
union union_caster {
FromT from;
ToT to;
};

//信号发射函数指针
emit_t pemit;
//存放“槽”对象
has_slots_interface* pdest;
// Pointers to member functions may be up to 16 bytes for virtual classes,
// so make sure we have enough space to store it.
unsigned char pmethod[16];

public:
//构造函数
//在构造connect时,要传入槽对象和槽类方法指针
template <typename DestT, typename... Args>
_opaque_connection(DestT* pd, void (DestT::*pm)(Args...)) : pdest(pd) {
//定义成员函数指针,与C语言中的函数指针是类似的
typedef void (DestT::*pm_t)(Args...);
static_assert(sizeof(pm_t) <= sizeof(pmethod),
"Size of slot function pointer too large.");

std::memcpy(pmethod, &pm, sizeof(pm_t));

//定义了一个函数指针
typedef void (*em_t)(const _opaque_connection* self, Args...);

//通过下面的方法,将 pemit 函数变理指向了 emitter 函数。
union_caster<em_t, emit_t> caster2;
//注意 emitter后面的是模版参数,不是函数参数,这里不要弄混了。
caster2.from = &_opaque_connection::emitter<DestT, Args...>;
pemit = caster2.to;
}

//返回"槽"对象
has_slots_interface* getdest() const { return pdest; }

...

//因为在构造函数里已经将 pemit 设置为 emitter 了,
//所以下面的代码就是调用 emitter 函数。为里只不过做了一次函数指针类型转换。
//也就是说调用 connect 的 emit 方法,实际调的是 emitter。
template <typename... Args>
void emit(Args... args) const {
typedef void (*em_t)(const _opaque_connection*, Args...);
union_caster<emit_t, em_t> caster;
caster.from = pemit;
(caster.to)(this, args...);
}

private:
template <typename DestT, typename... Args>
static void emitter(const _opaque_connection* self, Args... args) {
//pm_t是一个成员函数指针,它指向的是传进来的成员方法
typedef void (DestT::*pm_t)(Args...);
pm_t pm;
std::memcpy(&pm, self->pmethod, sizeof(pm_t));
//调用成员方法
(static_cast<DestT*>(self->pdest)->*(pm))(args...);
}
};

//signal_with_thread_policy类的父类。
//该类最主要的作用是存有一个conn list。
//在 signal_with_thread_policy中的connect方法就是对该成员变量的操作。

template <class mt_policy>
class _signal_base : public _signal_base_interface, public mt_policy {
protected:
typedef std::list<_opaque_connection> connections_list;

public:
...

protected:
//在 _signal_base 中定义了一个connection list,用于绑定的 slots.
connections_list m_connected_slots;
...

};

//该类是"槽"的实现
template <class mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
class has_slots : public has_slots_interface, public mt_policy {
private:
typedef std::set<_signal_base_interface*> sender_set;
typedef sender_set::const_iterator const_iterator;

public:
has_slots()
: has_slots_interface(&has_slots::do_signal_connect,
&has_slots::do_signal_disconnect,
&has_slots::do_disconnect_all) {}

...

private:
has_slots& operator=(has_slots const&);

//静态函数,用于与signal绑定,由父类调用
//它是在构造函数时传给父类的
static void do_signal_connect(has_slots_interface* p,
_signal_base_interface* sender) {
has_slots* const self = static_cast<has_slots*>(p);
lock_block<mt_policy> lock(self);
self->m_senders.insert(sender);
}

//静态函数,用于解绑signal,由父类调用
//它是在构造函数时传给父类的
static void do_signal_disconnect(has_slots_interface* p,
_signal_base_interface* sender) {
has_slots* const self = static_cast<has_slots*>(p);
lock_block<mt_policy> lock(self);
self->m_senders.erase(sender);
}

...

private:
//该集合中存放的是与slog绑定的 signal
sender_set m_senders;
};

//该类是信号的具体实现
//为了保证信号可以在不同的平台是线程安全的,所以这里使用了策略模式
//mt_policy参数表式的是,不同的平台使用不同的策略
//该类中有两个重要的函数,一个是connect用于与槽进行绑定;另一个是 emit用于发射信号

template <class mt_policy, typename... Args>
class signal_with_thread_policy : public _signal_base<mt_policy> {
public:

...

template <class desttype>
void connect(desttype* pclass, void (desttype::*pmemfun)(Args...)) {
//这是一个智能锁,当函数结束时,自动释放锁。
lock_block<mt_policy> lock(this);
//先将对象与"槽"组成一个conn,然后存放到 signal的 conn list里
//当发射信号时,调用 conn list中的每个conn的 emit方法。
this->m_connected_slots.push_back(_opaque_connection(pclass, pmemfun));

//在槽对象中也要保存 signal 对象。
pclass->signal_connect(static_cast<_signal_base_interface*>(this));
}

//遍历所有的连接,并调用 conn 的emit方法。最终调用的是绑定"槽"的方法
void emit(Args... args) {
lock_block<mt_policy> lock(this);
this->m_current_iterator = this->m_connected_slots.begin();
while (this->m_current_iterator != this->m_connected_slots.end()) {
_opaque_connection const& conn = *this->m_current_iterator;
++(this->m_current_iterator);

//调 conn 的 emit 方法,最终会调用绑定的 "槽" 方法。
conn.emit<Args...>(args...);
}
}

//重载()操作符,这样就从直接调用emit方法变成隐含调用emit方法。
void operator()(Args... args) { emit(args...); }
};


//下面的对不同参数信号的定义
template <typename... Args>
using signal = signal_with_thread_policy<SIGSLOT_DEFAULT_MT_POLICY, Args...>;


template <typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
using signal0 = signal_with_thread_policy<mt_policy>;

template <typename A1, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
using signal1 = signal_with_thread_policy<mt_policy, A1>;

template <typename A1,
typename A2,
typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
using signal2 = signal_with_thread_policy<mt_policy, A1, A2>;

...

} // namespace sigslot

小结

本文通过 sigslot作用、实现原理、如何使用以及详细的代码注释四个部分剖析了 WebRTC 中的 sigslot。sigslot是 WebRTC中非常底性的基础代码,它对 WebRTC 事件机制起着关键性的作用。熟悉sigslot,对我们阅读 WebRTC 代码会有非常大的帮助。

希望本文能对你有所帮助。谢谢!

欢迎关注我的其它发布渠道