AioWebSocket实现python异步接收B站直播弹幕
0. 前言
本文是2022-01-13发表于CSDN上的文章,当时并未搭建好网站,故现将此文转移至此。
原文:https://blog.csdn.net/Sharp486/article/details/122466308
第一次写文章,若有不对的地方请多多包涵并指正。
1. AioWebSocket是什么
1.1 认识WebSocket
Websocket是一种在单个TCP连接上进行全双工通信的协议。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
python中常用的WebSocket库有websocket-client,websockets,aiowebsocket三个。
1.2 WebSocket工作原理
1.客户端向服务端发起建立请求,服务端响应后便在客户端与服务端之间建立起一个信道,这一步称之为握手。 2.建立好信道后,服务端就将新消息推送到信道,每当信道中有新的消息,客户端便可以拿到。 3.服务端为了不占用过多资源,需要定时检查与客户端是否还在连接,因此需要客户端定时向服务端发送心跳包(HeartBeat),确保连接不断开。
1.3 认识AioWebSocket
而 AioWebSocket 是python里一个遵循 WebSocket 规范异步 WebSocket 客户端,相对于其他库它更轻、更快。而且异步的特性使得它可以同时接收消息和发送心跳包,更为方便,因此本文选用AioWebSocket。
2. AioWebSocket相比于http/https的优势
跟传统的http/https协议相比,WebSocket是长连接,只需向客户端请求一次便可获得永久性连接;而http/https是短连接,向客户端请求一次后便断开连接。因此在获取直播弹幕这种场景下显然WebSocket更具优势。
2.1 http/https协议获取B站直播弹幕
我们打开任意一个直播间,打开F12,点击Network,搜索gethistory,可以看到这个链接就是历史弹幕,那么用代码对这个链接发起请求就能获取直播的弹幕。如果没有看见这个链接,可以尝试刷新网站。
代码如下
1 |
|
输出结果:
可以看到,http只能一次性获取历史弹幕,这种方法虽然方便、简短,但若需长时间接收新弹幕,就需要循环请求。而且请求间隔不能太长也不能太短:太短会占用网络资源,甚至被封IP;太长会导致丢失一部分弹幕,因为每次请求只返回最新10条弹幕。
而http/https的这些缺点,正是WebSocket的优点。
2.2 AioWebSocket实现接收弹幕功能
实际上,B站直播弹幕也是通过WebSocket协议来实现的。那上文中的gethistory是怎么回事呢?其实进入直播间时会先初始化,我们看到一些历史弹幕,就是由gethistory这个链接返回的数据,后面新的弹幕,都是由WebSocket协议来接收的了。 下面我们来看看如何实现用WebSocket协议来接收弹幕。
我们随便找个直播间,打开F12,点击Network,搜索sub,可以看到,这个就是我们客户端与服务端通信的WebSocket协议了。
知道了b站直播也是靠WebSocket实现的,那么下一步,我们怎么用python去模拟客户端跟服务端建立连接呢?
github上其实已经有b站的api了我们不需要自己研究,只需要移植过来。
我们只需要关注几部分:调用地址、数据包格式、消息类型
地址直接选择未加密的
按照操作类型分类
按消息类型分类
想要获得哪些信息只需要根据字段捕获就ok!详细可见下面代码,有DANMU_MSG和SEND_GIFT字段消息的捕获
3. 如何使用AioWebSocket
接下来看看如何在python中写代码
安装
1 |
|
导入模块
1 |
|
创建异步任务 1
2
3
4
5
6
7
8
9
10
11
12
13remote = 'ws://broadcastlv.chat.bilibili.com:2244/sub'
roomid = '21733344'
data_raw = '000000{headerLen}0010000100000007000000017b22726f6f6d6964223a{roomid}7d'
data_raw = data_raw.format(headerLen=hex(27 + len(roomid))[2:],
roomid=''.join(map(lambda x: hex(ord(x))[2:], list(roomid))))
async def startup():
async with AioWebSocket(remote) as aws:
converse = aws.manipulator
await converse.send(bytes.fromhex(data_raw))
tasks = [receDM(converse), sendHeartBeat(converse)]
await asyncio.wait(tasks)
1 |
|
接收消息 (这里的 if recv_text == None
整个if语句必须加上,否则会断开连接,不知道是阿b的问题还是代码的问题。)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53async def receDM(websocket):
while True:
recv_text = await websocket.receive()
if recv_text == None:
recv_text = b'\x00\x00\x00\x1a\x00\x10\x00\x01\x00\x00\x00\x08\x00\x00\x00\x01{"code":0}'
printDM(recv_text)
# 将数据包传入:
def printDM(data):
# 获取数据包的长度,版本和操作类型
packetLen = int(data[:4].hex(), 16)
ver = int(data[6:8].hex(), 16)
op = int(data[8:12].hex(), 16)
# 有的时候可能会两个数据包连在一起发过来,所以利用前面的数据包长度判断,
if (len(data) > packetLen):
printDM(data[packetLen:])
data = data[:packetLen]
# 有时会发送过来 zlib 压缩的数据包,这个时候要去解压。
if (ver == 2):
data = zlib.decompress(data[16:])
printDM(data)
return
# ver 为1的时候为进入房间后或心跳包服务器的回应。op 为3的时候为房间的人气值。
if (ver == 1):
if (op == 3):
print('[RENQI] {}'.format(int(data[16:].hex(), 16)))
return
# ver 不为2也不为1目前就只能是0了,也就是普通的 json 数据。
# op 为5意味着这是通知消息,cmd 基本就那几个了。
if (op == 5):
try:
jd = json.loads(data[16:].decode('utf-8', errors='ignore'))
if (jd['cmd'] == 'DANMU_MSG'):
print('[DANMU] ', jd['info'][2][1], ': ', jd['info'][1])
elif (jd['cmd'] == 'SEND_GIFT'):
print('[GITT]', jd['data']['uname'], ' ', jd['data']['action'], ' ', jd['data']['num'], 'x',
jd['data']['giftName'])
elif (jd['cmd'] == 'LIVE'):
print('[Notice] LIVE Start!')
elif (jd['cmd'] == 'PREPARING'):
print('[Notice] LIVE Ended!')
else:
print('[OTHER] ', jd['cmd'])
except Exception as e:
pass
1 |
|
输出结果:
[DANMU]开头的就是接收到的弹幕内容了。 开始还报了一个warning,是因为
1 |
|
这种写法已经过期了,在以后新版本的python解释器中将不在适用。不过问题不大,以后不能用再改罢。
4. 更新
4.1 2022-04-23更新
结尾warning解决:(小伙伴投稿)
1 |
|