使用RabitMQ解决 论坛用户评论并发抢楼问题

前言:

本篇文章分享如何在用户评论高并发数据写入的情况下通过使用消息队列来保证评论服务。

问题引入:为何使用消息队列?

在我遇到的场景中是一个关于论坛在短时间内用户对同一个帖子进行评论遇到的并发楼层抢占问题,如果这里不做任何限制那么在同一时刻多个用户共同对帖子留言就会导致数据表Floor存储的数据不准确,进而导致数据不一致,而解决这个办法有三种选择:1.加锁控制、2.开始事务、3.使用消息队列。

1.加锁控制

使用互斥锁是解决并发问题的常用手段,但是相对来说,我们想要完美的解决问题 需要对 锁的粒度、锁的时机、以及放锁的时机。把握的非常精妙,大多数情况下复杂的业务场景会导致业务代码的层级较深,而互斥锁穿插在这种复杂的层级关系中一不留神就会导致诸如:琐失效、死锁……而且业务中引入了锁之后也会对我们的QPS造成影响。

2. 开启事务

使用Gorm 事务也能够保证数据的原子性,这个相对来说也是较为方便和容易的,但是在开启事务时我们应该考虑如何简化dao操作尽量缩小粒度,让事务内的过程尽量简化但又能达到效果即可。

3. 使用消息队列

它的优点是解耦、高可用、高并发;缺点是存在消息丢失、消息重复处理等问题,且实现较为复杂,需要考虑多种因素。这里使用RabbitMQ (如何保证消息队列消息不丢失这里不做说明)

业务代码分析

数据表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package model

// Comment 评论表
type Comment struct {
ID uint64 `gorm:"primaryKey;comment:评论ID"`
Content string `gorm:"type:longtext;comment:评论内容"`
UserID uint64 `gorm:"index;not null;comment:评论用户ID"`
ArticleID uint64 `gorm:"index;not null;comment:文章ID"`
ParentID uint64 `gorm:"index;not null;comment:父级评论ID"`
Floor uint32 `gorm:"index;not null;comment:评论楼层"`
State uint8 `gorm:"comment:该评论状态"`

Ctime int64 // 创建时间,毫秒作为单位
Utime int64 // 更新时间,毫秒作为单位
}

业务逻辑部分

daomain/comment.go

1
2
3
4
5
6
7
8
9
10
11
12
type Comment struct {
ID uint64 // 评论ID
Content string // 评论内容
UserID uint64 // 评论用户ID
ArticleID uint64 // 文章ID
ParentID uint64 // 父级评论ID
Floor uint32 // 评论楼层
State uint8 // 该评论状态 0:正常,1:删除

Ctime int64 // 创建时间,毫秒作为单位
Utime int64 // 更新时间,毫秒作为单位
}

dao/comment.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

type ArticleDAO interface {
// Insert 创建评论
Insert(ctx context.Context, comment model.Comment) (uint64, error)
// GetLatestFloorByArticleID 根据 文章ID 获取评论最新楼层
GetLatestFloorByArticleID(ctx context.Context, articleID uint64) (uint32, error)
}

type commentGORM struct {
db *gorm.DB
}

func (c *commentGORM) Insert(ctx context.Context, comment model.Comment) (uint64, error) {
err := c.db.WithContext(ctx).Create(&comment).Error
return comment.ID, err
}

func (c *commentGORM) GetLatestFloorByArticleID(ctx context.Context, articleID uint64) (uint32, error) {
var comment model.Comment
err := c.db.WithContext(ctx).Where("article_id=?", articleID).Last(&comment).Error
return comment.Floor, err
}

func NewCommentDao(db *gorm.DB) CommentDao {
return &commentGORM{db: db}
}

cache/comment.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
type CommentCache interface {
ZAdd(ctx context.Context, comment ...domain.Comment) error
ZGet(ctx context.Context, cid uint64, by domain.RangeBy) ([]domain.Comment, error)
}

type commentRedisCache struct {
client *redis.Client
}

func (c *commentRedisCache) ZAdd(ctx context.Context, comments ...domain.Comment) error {
var err error
for i, _ := range comments {
comment := comments[i]
data, err := json.Marshal(comment)
if err != nil {
return err
}
// 使用 ZSet 存储 某篇文章 的评论
_, err = c.client.WithContext(ctx).ZAdd(fmt.Sprintf("article_comments_%d", comment.ArticleID), redis.Z{
Score: float64(comment.Floor),
Member: data,
}).Result()
if err != nil {
break
}
}
return err
}

func (c *commentRedisCache) ZGet(ctx context.Context, articleID uint64, by domain.RangeBy) ([]domain.Comment, error) {
var comments []domain.Comment
var result []string
var err error
if by.Order == enum.Positive {
result, err = c.client.WithContext(ctx).ZRange(
fmt.Sprintf("article_comments_%d", articleID), by.Start, by.Stop,
).Result()
if err != nil {
return comments, err
}
} else {
result, err = c.client.WithContext(ctx).ZRevRange(
fmt.Sprintf("article_comments_%d", articleID), by.Start, by.Stop,
).Result()
if err != nil {
return comments, err
}
}

for i, _ := range result {
// 获取对应数据
data := result[i]
// 构建对象
entity := domain.Comment{}
err = json.Unmarshal([]byte(data), &entity)
if err != nil {
return comments, err
}
comments = append(comments, entity)
}
return comments, nil
}

func NewCommentRedisCache(client *redis.Client) CommentCache {
return &commentRedisCache{client: client}
}

repo/comment.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
type CommentRepo interface {
// CreateAndCached 创建评论,并写入缓存
CreateAndCached(ctx context.Context, comment domain.Comment) error
// ConsumerMQ
ConsumerMQ(ctx context.Context) error
}

type commentRepo struct {
dao dao.CommentDao
cache cache.CommentCache
}

func (c *commentRepo) CreateAndCached(ctx context.Context, comment domain.Comment) error {
comment.Ctime = utils.GetTimeMilli()
entity := model.Comment{
Content: comment.Content,
UserID: comment.UserID,
ArticleID: comment.ArticleID,
ParentID: comment.ParentID,
Floor: comment.Floor,
Ctime: comment.Ctime,
}
// 存入消息队列
data, err := json.Marshal(entity)
if err != nil {
return err
}
err = global.RabbitMQ.PublishOnQueue(ctx, data)
if err != nil {
return err
}

return err
}

func (c *commentRepo) ConsumerMQ(ctx context.Context) error {
// 从消息队列获取
msgs, err := global.RabbitMQ.Ch.Consume(global.RabbitMQ.QueueName, "评论消费", true, false, false, false, nil)
if err != nil {
return err
}
go func() error {
for msg := range msgs {
endData := model.Comment{}
err = json.Unmarshal(msg.Body, &endData)
if err != nil {
return err
}
floor, err := c.GetLatestFloor(ctx, endData.ArticleID)
if err != nil && err != gorm.ErrRecordNotFound {
return err
}
endData.Floor = floor + 1
_, err = c.dao.Insert(ctx, endData)
if err != nil {
return err
}
// 写入缓存
err = c.cache.Set(ctx, domain.Comment(endData))
err = c.cache.ZAdd(ctx, domain.Comment(endData))
if err != nil {
global.Log.Warn(err.Error())
}
}
return err
}()
return err
}

func NewCommentRepo(dao dao.CommentDao, cache cache.CommentCache) CommentRepo {
return &commentRepo{dao: dao, cache: cache}
}

功能详解:

上述代码基本代表了有关评论业务操作的repository层操作,我们从最上层接口开始逐层拆分介绍。

  1. repo/comment.go 下的 ConsumerMQ(ctx context.context) 方法在我们路由注册的时候进行依赖注入,使用协程启动评论消费端,等待接收生产者的信号,当用户发表评论的时候就会走 CreateAndCached(ctx,comment) 接口方法将用户的评论信息写入消费队列,经过消费者消费成功后,把本篇文章的评论信息追加到 ZSet中进行评论缓存。
  2. ConsumerMQ 方法中每次从消费队列获取用户评论后对该评论的所属文章进行最新楼层查询,这样保证评论的顺序性,同时也保证了写入数据库的评论楼层数也是具备原子性的。
  3. 在ApiPost 提供的并发测试接口中 并发数 5000* 5 轮的情况下,整体API接口的平均延迟在 300MS左右,可见效果还是非常不错的。

使用RabitMQ解决 论坛用户评论并发抢楼问题
http://yoursite.com/2023/07/11/Golang-使用RabitMQ解决-论坛用户评论并发抢楼问题/
作者
Meng-Xin
发布于
2023年7月11日
许可协议