红包雨

TL 厂竞赛项目记录 20230905-20230916

借此机会记录红包雨项目后端开发的全流程

项目需求

通过参与活动获取随机红包金额,获得的红包金额将直接存入用户的账户余额,用户可以用于消费或提现。

功能要求

  1. 用户、企业注册、登录、查看账号余额等;
  2. 支持活动的创建,要求包含活动名称、时间、描述等基本内容;
  3. 发放红包、领取红包,支持互动聊天弹幕(聊天室的信息发送及滚动展示)、排行榜等即时互动功能;
  4. 支持从后端查看活动及活动的红包的领取记录和情况;
  5. 注意用户数据的保护,用户登录需要加密;

设计要求

  1. 提供简洁直观的用户界面,过程中应考虑用户参与活动的便利性和趣味性;
  2. 可以探讨一些吸引用户参与的营销策略,如推送消息提醒、分享红包活动获取额外奖励等;
  3. 界面端呈现形式(限定在 H5、PC Web,小程序、Android、iOS、Flutter等)。

整体设计

预期采用前后端分离架构,技术栈如下:

框架名称功能
Spring Boot后端框架
Spring Security安全及权限校验
MyBatis-Plus持久层框架
MySQL数据库
Redis 集群缓存
Slf4j日志
Quartz定时任务
Netty网络应用程序框架

以上就是本项目使用的技术栈,这些技术都是各个领域内主流的框架,安全性、稳定性以及可扩展性都有足够的保障。通过这些框架,本项目团队设计了功能全面的后端接口。按功能分可以分为四大模块:定时任务模块、即时互动模块、红包活动模块、系统管理模块。各个模块下有若干接口实现相应功能。本项目功能架构如下图所示。

技术可行性分析

本节对红包雨系统的技术可行性进行了详细分析。该系统面临着并发处理、强一致性要求和防恶意行为的挑战。以下是针对这些挑战的可行性分析。

  1. 针对系统的并发处理挑战,我们在技术上采用了Redis集群,Redis集群是一种高性能的内存数据库,特别适合处理高并发请求。在业务上选择了发布活动时直接拆分红包这一策略,来减轻服务器在高峰期的瞬时负载压力,提高系统的可伸缩性。经验证,以上手段能够有效提升系统的并发能力。
  2. 系统中的强一致性主要涉及金额的增减操作。我们采用了Redis的list和 hash结构来分发红包,同时通过数据库事务来确保金额的准确增加或减少。首先,红包金额会在Redis的List中进行分发,确保每个红包只能被领取一次。然后,一旦用户领取了红包,系统会触发数据库事务,将对应金额从企业账户扣除,并将金额存入用户账户,从而保证了红包的金额分发和企业账户的准确增减。通过这一流程,我们确保了每个红包要么被领取,要么被归还到企业账户,从而实现了强一致性,防止了金额的错误分发或丢失问题。这一策略在确保红包雨系统的财务一致性方面表现出可行性。
  3. 系统面临来自恶意用户的潜在攻击,为了应对这一挑战,我们采取了多层次的安全措施。在技术层面,我们引入了Spring Security的过滤器机制,这个强大的安全框架可以检测和拦截潜在的恶意请求。Spring Security的配置允许我们定义访问规则、身份验证流程以及对敏感操作的保护,从而增强了系统的安全性。在业务层面,我们引入了用户黑名单和登录失败次数限制的功能。具体来说,当用户多次登录失败达到五次时,系统会自动锁定该用户的账户半小时,以防止暴力破解。此外,我们维护一个黑名单,用于标识已知的恶意用户。黑名单中的用户将被系统屏蔽,并不允许其进行进一步的操作。

通过结合技术和业务层面的措施,我们有效地减少了潜在的攻击风险,提高了系统的整体安全性。这一综合策略有助于确保红包雨系统不受到恶意行为的威胁,保护了用户的资金和数据安全。

综上所述,红包雨系统在处理高并发、维护财务一致性和防范恶意行为方面采用了一系列有效的技术和策略。通过使用Redis集群和智能红包拆分策略,系统成功提高了并发处理能力。此外,通过Redis和数据库的协同工作,确保了金额的强一致性,防止了错误分发。在安全性方面,引入了Spring Security的过滤器机制、用户黑名单和登录失败次数限制,有力地防御了恶意攻击。这些措施的综合应用为红包雨系统带来了可行性,保障了系统的性能、一致性和安全性。

然而,系统仍需持续监测和改进,以确保系统适应不断变化的需求和威胁,为用户和企业提供可靠的服务。然而,我们仍需持续监测系统的性能、安全性,并进行容量规划,以确保系统在高并发场景下能有效地运行。这些措施将有助于保持系统的可行性,并确保用户的满意度。

数据库设计

MySQL 设计

红包活动表 tbl_red_envelope

字段名数据类型默认值允许空值备注
idbigint(16)红包id
total_amountdecimal(10,2)红包总金额
red_countint红包总个数
contentvarchar(255)活动详情
sign_start_timedatetime报名开始时间
sign_end_timedatetime报名截止时间
start_timedatetime抢红包开始时间
end_timedatetime抢红包截止时间
head_count_limitint活动人数限制
activity_namevarchar活动名称
split_envelopesvarchar红包拆分情况,金额用英文逗号分隔
same_groupint0是否是组内红包,1是,0否
picturevarchar图片链接
statusint进行中0、已满1、已结束2
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

报名表 tbl_sign

字段名数据类型默认值允许空值备注
idbigint(16)报名 id
red_idbigint(16)所属红包 id
user_idbigint(16)报名用户 id
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

抢红包记录表 tbl_record

字段名数据类型默认值允许空值备注
idbigint(16)红包记录id
received_user_idbigint(16)收到红包的用户id
red_idbigint(16)红包id
scoreint点击得分
receive_amountdecimal用户抢到的金额
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

消息通知表 tbl_message

字段名数据类型默认值允许空值备注
idbigint(16)消息id
receive_crowd_idbigint(16)收取人群id
send_user_idbigint(16)发送账号id
is_autoint是否自动发送
send_timedatetime发送时间
replyvarchar收到回复
methodint消息形式 1-短信发送 2-邮件发送
contentvarchar消息内容
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

人群表 tbl_group

字段名数据类型默认值允许空值备注
idbigint(16)人群id
user_idbigint(16)接收用户id
user_namevarchar用户姓名
phonevarchar手机号
emailvarchar邮箱地址
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

用户表 ums_user

字段名数据类型默认值允许空值备注
idbigint(16)
usernamevarchar用户名
passwordvarchar密码
iconvarchar头像
phonevarchar手机号
emailvarchar邮箱
nick_namevarchar昵称
account_balancedecimal0.00用户余额
withdraw_amountdecimal0.00已提现金额
notevarchar备注信息
login_timedatetime最后登录时间
statusint1帐号启用状态:0->禁用;1->启用
blackint0是否是黑名单 1 是黑名单 0不是
belong_tovarchar-1组织 id ,没有组织默认-1
role_idbigint(16)角色 1-管理员 2-用户 3-企业
black_typevarchar拉入黑名单的类型
black_reasonvarchar拉入黑名单的原因
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

角色表 ums_role

字段名数据类型默认值允许空值备注
idbigint(16)角色ID
namevarchar名称
descriptionvarchar描述
statusint1启用状态:0->禁用;1->启用
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

用户登录日志表 ums_user_login_log

字段名数据类型默认值允许空值备注
idbigint(16)日志 ID
user_idbigint(16)用户ID
ipvarchar登陆地IP
addressvarchar登录地址
user_agentvarchar浏览器登录类型
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

资源表 ums_resource

字段名数据类型默认值允许空值备注
idbigint(16)资源ID
namevarchar分类名称
descriptionvarchar描述
statusint1启用状态:0->禁用;1->启用
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

角色资源关系表 ums_role_resource_relation

字段名数据类型默认值允许空值备注
idbigint(16)
role_idbigint(16)角色ID
resource_idbigint(16)资源ID
statusint1启用状态:0->禁用;1->启用
create_timedatetime创建时间
update_timedatetime创建用户
create_byvarchar修改时间
update_byvarchar修改用户
deletedint0逻辑删除标识:0->否;1->是

Redis 设计

核心的Redis键:

  • RED_ENVELOPE_CREATE_KEY:用于存储红包拆分情况的列表。
  • RED_ENVELOPE_CREATE_COUNT:用于缓存报名人数。
  • RED_ENVELOPE_SCORE:用于存储用户得分的哈希表。
  • RED_ENVELOPE_LOCK:用于分布式锁,以确保操作的原子性。
  • RED_ENVELOPE_CONSUME_KEY:用于存储用户抢到的红包金额。

工具类

RedisService

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * redis操作Service
 */
public interface RedisService {

    /**
     * 加锁操作
     */
    public boolean acquireLock(String lockKey, String requestId, long expireTimeInSeconds);

    public boolean acquireLock(String lockKey, long expireTimeInSeconds);

    /**
     * 是否拥有锁
     */
    boolean isLocked(String key);

    /**
     * 释放锁操作
     */
    public boolean releaseLock(String lockKey, String requestId);

    public boolean releaseLock(String lockKey);

    /**
     * 异步等待锁的释放
     */
    public void waitForLockRelease(String lockKey);

    /**
     * 保存属性
     */
    void set(String key, Object value, long time);

    /**
     * 保存属性
     */
    void set(String key, Object value);

    /**
     * 获取属性
     */
    Object get(String key);

    /**
     * 删除属性
     */
    Boolean del(String key);

    /**
     * 批量删除属性
     */
    Long del(List<String> keys);

    /**
     * 设置过期时间
     */
    Boolean expire(String key, long time);

    /**
     * 获取过期时间
     */
    Long getExpire(String key);

    /**
     * 判断是否有该属性
     */
    Boolean hasKey(String key);

    /**
     * 按delta递增
     */
    Long incr(String key, long delta);

    /**
     * 按delta递减
     */
    Long decr(String key, long delta);

    /**
     * 获取Hash结构中的属性
     */
    Object hGet(String key, String hashKey);

    /**
     * 向Hash结构中放入一个属性
     */
    Boolean hSet(String key, String hashKey, Object value, long time);

    /**
     * 向Hash结构中放入一个属性
     */
    void hSet(String key, String hashKey, Object value);

    /**
     * 直接获取整个Hash结构
     */
    Map<Object, Object> hGetAll(String key);

    /**
     * 直接设置整个Hash结构
     */
    Boolean hSetAll(String key, Map<String, Object> map, long time);

    /**
     * 直接设置整个Hash结构
     */
    void hSetAll(String key, Map<String, ?> map);

    /**
     * 删除Hash结构中的属性
     */
    void hDel(String key, Object... hashKey);

    /**
     * 判断Hash结构中是否有该属性
     */
    Boolean hHasKey(String key, String hashKey);

    /**
     * Hash结构中属性递增
     */
    Long hIncr(String key, String hashKey, Long delta);

    /**
     * Hash结构中属性递减
     */
    Long hDecr(String key, String hashKey, Long delta);

    /**
     * 获取Set结构
     */
    Set<Object> sMembers(String key);

    /**
     * 向Set结构中添加属性
     */
    Long sAdd(String key, Object... values);

    /**
     * 向Set结构中添加属性
     */
    Long sAdd(String key, long time, Object... values);

    /**
     * 是否为Set中的属性
     */
    Boolean sIsMember(String key, Object value);

    /**
     * 获取Set结构的长度
     */
    Long sSize(String key);

    /**
     * 删除Set结构中的属性
     */
    Long sRemove(String key, Object... values);

    /**
     * 获取List结构中的属性
     */
    List<Object> lRange(String key, long start, long end);

    /**
     * 获取List结构的长度
     */
    Long lSize(String key);

    /**
     * 根据索引获取List中的属性
     */
    Object lIndex(String key, long index);

    /**
     * 向List结构中添加属性
     */
    Long lPush(String key, Object value);

    /**
     * 向List结构中添加属性
     */
    Long lPush(String key, Object value, long time);

    /**
     * 向List结构中批量添加属性
     */
    Long lPushAll(String key, Object... values);

    /**
     * 向List结构中批量添加属性
     */
    Long lPushAll(String key, Long time, Object... values);

    /**
     * 从List结构中移除属性
     */
    Long lRemove(String key, long count, Object value);

    /**
     * List 出栈
     */
    Object lPop(String key);

}

RedisServiceImpl

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import com.project.common.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.project.modules.redEnvelope.constant.Constant.RED_ENVELOPE_LOCK;

/**
 * redis操作实现类
 */
public class RedisServiceImpl implements RedisService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 加锁操作
    @Override
    public boolean acquireLock(String lockKey, String requestId, long expireTimeInSeconds) {
        try {
            Boolean lockAcquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTimeInSeconds, TimeUnit.SECONDS);
            return lockAcquired != null && lockAcquired;
        } catch (Exception e) {
            // 处理异常
            return false;
        }
    }

    @Override
    public boolean acquireLock(String lockKey, long expireTimeInSeconds) {
        return acquireLock(lockKey, RED_ENVELOPE_LOCK, expireTimeInSeconds);
    }

    // 释放锁操作
    @Override
    public boolean releaseLock(String lockKey, String requestId) {
        try {
            String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
            if (currentValue != null && currentValue.equals(requestId)) {
                return stringRedisTemplate.delete(lockKey);
            }
            return false;
        } catch (Exception e) {
            // 处理异常
            return false;
        }
    }

    public boolean isLockOwned(String lockKey, String requestId) {
        try {
            String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
            return currentValue != null && currentValue.equals(requestId);
        } catch (Exception e) {
            // 处理异常
            return false;
        }
    }

    @Override
    public boolean isLocked(String key) {
        return isLockOwned(key, RED_ENVELOPE_LOCK);
    }

    @Override
    public boolean releaseLock(String lockKey) {
        return releaseLock(lockKey, RED_ENVELOPE_LOCK);
    }

    // 异步等待锁的释放
    @Override
    public void waitForLockRelease(String lockKey) {
        // 使用BLPOP来异步等待锁的释放
        // BLPOP会一直等待直到有数据被push到lockKey,此时表示锁已释放
        stringRedisTemplate.opsForList().leftPop(lockKey);
    }

    @Override
    public void set(String key, Object value, long time) {
        redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
    }

    @Override
    public void set(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
    }

    @Override
    public Object get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    @Override
    public Boolean del(String key) {
        return redisTemplate.delete(key);
    }

    @Override
    public Long del(List<String> keys) {
        return redisTemplate.delete(keys);
    }

    @Override
    public Boolean expire(String key, long time) {
        return redisTemplate.expire(key, time, TimeUnit.SECONDS);
    }

    @Override
    public Long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    @Override
    public Boolean hasKey(String key) {
        return redisTemplate.hasKey(key);
    }

    @Override
    public Long incr(String key, long delta) {
        return redisTemplate.opsForValue().increment(key, delta);
    }

    @Override
    public Long decr(String key, long delta) {
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    @Override
    public Object hGet(String key, String hashKey) {
        return redisTemplate.opsForHash().get(key, hashKey);
    }

    @Override
    public Boolean hSet(String key, String hashKey, Object value, long time) {
        redisTemplate.opsForHash().put(key, hashKey, value);
        return expire(key, time);
    }

    @Override
    public void hSet(String key, String hashKey, Object value) {
        redisTemplate.opsForHash().put(key, hashKey, value);
    }

    @Override
    public Map<Object, Object> hGetAll(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    @Override
    public Boolean hSetAll(String key, Map<String, Object> map, long time) {
        redisTemplate.opsForHash().putAll(key, map);
        return expire(key, time);
    }

    @Override
    public void hSetAll(String key, Map<String, ?> map) {
        redisTemplate.opsForHash().putAll(key, map);
    }

    @Override
    public void hDel(String key, Object... hashKey) {
        redisTemplate.opsForHash().delete(key, hashKey);
    }

    @Override
    public Boolean hHasKey(String key, String hashKey) {
        return redisTemplate.opsForHash().hasKey(key, hashKey);
    }

    @Override
    public Long hIncr(String key, String hashKey, Long delta) {
        return redisTemplate.opsForHash().increment(key, hashKey, delta);
    }

    @Override
    public Long hDecr(String key, String hashKey, Long delta) {
        return redisTemplate.opsForHash().increment(key, hashKey, -delta);
    }

    @Override
    public Set<Object> sMembers(String key) {
        return redisTemplate.opsForSet().members(key);
    }

    @Override
    public Long sAdd(String key, Object... values) {
        return redisTemplate.opsForSet().add(key, values);
    }

    @Override
    public Long sAdd(String key, long time, Object... values) {
        Long count = redisTemplate.opsForSet().add(key, values);
        expire(key, time);
        return count;
    }

    @Override
    public Boolean sIsMember(String key, Object value) {
        return redisTemplate.opsForSet().isMember(key, value);
    }

    @Override
    public Long sSize(String key) {
        return redisTemplate.opsForSet().size(key);
    }

    @Override
    public Long sRemove(String key, Object... values) {
        return redisTemplate.opsForSet().remove(key, values);
    }

    @Override
    public List<Object> lRange(String key, long start, long end) {
        return redisTemplate.opsForList().range(key, start, end);
    }

    @Override
    public Long lSize(String key) {
        return redisTemplate.opsForList().size(key);
    }

    @Override
    public Object lIndex(String key, long index) {
        return redisTemplate.opsForList().index(key, index);
    }

    @Override
    public Long lPush(String key, Object value) {
        return redisTemplate.opsForList().rightPush(key, value);
    }

    @Override
    public Long lPush(String key, Object value, long time) {
        Long index = redisTemplate.opsForList().rightPush(key, value);
        expire(key, time);
        return index;
    }

    @Override
    public Long lPushAll(String key, Object... values) {
        return redisTemplate.opsForList().rightPushAll(key, values);
    }

    @Override
    public Long lPushAll(String key, Long time, Object... values) {
        Long count = redisTemplate.opsForList().rightPushAll(key, values);
        expire(key, time);
        return count;
    }

    @Override
    public Long lRemove(String key, long count, Object value) {
        return redisTemplate.opsForList().remove(key, count, value);
    }

    @Override
    public Object lPop(String key) {
        return redisTemplate.opsForList().leftPop(key);
    }
}

系统管理模块

系统管理模块在系统中扮演着关键的角色,其任务是实现用户注册、登录以及敏感资源的权限控制。这一模块的核心目标是确保只有经过授权的用户才能访问敏感资源,以维护系统的安全性和数据保密性。

在认证方面,系统采用JWT(JSON Web Token)作为用户令牌的生成和验证机制,并提供相关功能,如令牌刷新和令牌撤销。为了增强令牌的安全性,系统前后端约定了一个常量值,作为令牌的前缀,以降低令牌被盗用的潜在风险。此外,为保护传输中的密码数据的机密性和完整性,前端首先对密码进行一次SM2加密,然后在后端再进行二次加密后才存储到数据库中。

在授权方面,系统设计了数据库模型来管理用户、角色、权限和资源之间的多对多关系。系统定义了三种角色:系统管理员、普通用户和企业用户。为了动态地管理用户对资源的访问权限,系统实现了AccessDecisionManager接口,该接口用于决定属于某个角色的用户是否被允许访问特定的资源。

在网络攻击处理方面,系统引入了Spring Security框架,以应对常见的作弊和Web攻击。系统采取了一系列措施,包括输入验证、输出编码、防止SQL注入和CSRF攻击等,以提高系统的安全性,并有效应对潜在的攻击威胁。

核心代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 权限控制
/**
 * 动态权限决策管理器,用于判断用户是否有访问权限、
 */
public class DynamicAccessDecisionManager implements AccessDecisionManager {
    @Override
    public void decide(Authentication authentication, Object object,
                       Collection<ConfigAttribute> configAttributes) throws AccessDeniedException, InsufficientAuthenticationException {
        // 当接口未被配置资源时直接放行
        if (CollUtil.isEmpty(configAttributes)) {
            return;
        }
        for (ConfigAttribute configAttribute : configAttributes) {
            //将访问所需资源或用户拥有资源进行比对
            String needAuthority = configAttribute.getAttribute();
            for (GrantedAuthority grantedAuthority : authentication.getAuthorities()) {
                if (needAuthority.trim().equals(grantedAuthority.getAuthority())) {
                    return;
                }
            }
        }
        throw new AccessDeniedException("抱歉,您没有访问权限");
    }
    @Override
    public boolean supports(ConfigAttribute configAttribute) {
        return true;
    }
    @Override
    public boolean supports(Class<?> aClass) {
        return true;
    }
}

红包活动模块

抢红包核心思路

该模块包括了创建红包活动、查询红包活动、用户报名抢红包、提交抢红包分数、查看红包领取情况、查看红包排行榜等功能,是红包雨系统的核心,用于管理和控制红包活动的整个生命周期。核心接口如下:

(1) 创建红包活动

企业设定活动名称、描述、红包金额、数量、抢红包时间等参数,发布红包活动。

在发布红包之后,会立刻出发一个拆红包算法。项目采用了改进的二倍均值算法来实现红包的分发,这个算法在初始化阶段确保了每个红包的金额在剩余红包平均值的50% - 150%之间浮动,这样既保持了一定的随机性,又控制了红包金额的合理范围,避免了出现过大或过小的情况。这是一种有效的策略,使得红包的分发更具趣味性和公平性。

项目计划探索一种高级红包分发策略,即利用抢红包活动的历史数据来训练聚类模型,以提升分发过程的智能化。该模型的主要目标是基于用户过去抢红包的元宝数、红包数、炸弹数等历史数据模式,来生成下一次红包的拆分比重数组,而不仅仅依赖随机性。这一方法将随机性与数据驱动的理念相结合,旨在使用户抢得的红包金额比重更加接近他们在实际抢红包活动中的表现情况。这个策略在提高用户体验和红包分发公平性方面具有潜在的价值。

(2) 查询红包活动

本接口支持前端页面根据活动状态、用户所属组织、活动名称和描述条件分页查询红包活动。在企业页面中,企业可以查询自己发起的红包活动;在用户页面中,用户可以查询自己参与的红包活动。当活动报名人数已满时,或活动报名时间截止时,会对活动的状态属性进行更改。点击某一红包活动时,可以进入详细查询该红包,并在相应页面进行报名和进入聊天等候室。

(3) 报名抢红包

用户在报名抢红包时,后台会启动定时任务,在活动开始时向前端发送开抢指令,在活动开始前发送短信和邮件通知,提醒用户即将开始抢红包。

(4) 提交抢红包分数

对于用户提交抢红包分数这一接口,我们采用了一种高效且用户友好的设计。前端设定了每1-3秒提交一次当前抢红包的总得分,并在抢红包结束时再次提交当前的总得分。这一流程的核心是在后端Redis缓存中维护一个hash结构,用于实时更新用户的总得分。这种设计带来了多重好处。首先,它允许我们保存用户在抢红包过程中的得分情况,确保即使在最终提交分数之前,用户的得分已经得到记录。这有助于防止用户因为最终提交分数失败而导致没有获得任何金额的情况。其次,这一设计有助于分散后端接口请求的压力。通过将得分提交分散在一段时间内,而不是等到抢红包结束时才提交,我们可以有效地降低了并发请求的高峰,从而提高了系统的稳定性和性能。综合而言,这一方案不仅提高了用户体验,还优化了系统的性能和稳定性。它充分利用了Redis的实时性和高性能特点,为红包雨项目的顺畅运行提供了有力支持。

(5) 查看抢得红包金额

由于分配红包需要根据用户在红包雨游戏中的得分来进行对应分配,所以本接口通过异步任务的方式,等待后台结算红包完成,分布式锁释放后,从Redis中获取当前用户在指定红包中抢到的金额。

(6) 查看抢红包排行榜

用户在抢完红包后会跳转排行榜,在此界面可以查看当前红包雨的抢红包情况,并且可以在下方进行留言互动。

拆红包算法

假设 M 为剩余红包金额,N 为剩余人数,求出剩余平均金额为 avg = M/N,每位用户抢到的金额在 avg 的上下 50% 随机浮动。为保证高并发、无锁化、原子性,使用 Redis 实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    /**
     * 拆红包的算法:在剩余红包平均值的 50% - 150% 浮动
     *
     * @param totalMoney        总金额
     * @param redEnvelopeNumber 红包数
     * @return 红包分配情况,从大到小排列
     */
    protected static BigDecimal[] splitRedEnvelopeAlgorithm(BigDecimal totalMoney, int redEnvelopeNumber) {
        BigDecimal[] res = new BigDecimal[redEnvelopeNumber];
        BigDecimal useMoney = BigDecimal.ZERO;
        for (int i = 0; i < redEnvelopeNumber; i++) {
            if (i == redEnvelopeNumber - 1) {
                res[i] = totalMoney.subtract(useMoney);
            } else {
                BigDecimal avgMoney = totalMoney.subtract(useMoney)
                        .divide(BigDecimal.valueOf(redEnvelopeNumber - i), 2, RoundingMode.HALF_UP)
                        .divide(BigDecimal.valueOf(2), 2, RoundingMode.HALF_UP);
                // 在剩余红包平均值的 50% - 150% 浮动
                res[i] = RandomUtil.randomBigDecimal(avgMoney, avgMoney.multiply(BigDecimal.valueOf(3)))
                        .setScale(2, RoundingMode.HALF_UP);
            }
            useMoney = useMoney.add(res[i]);
        }
        return res;
    }

即时互动模块(聊天室)

即时互动模块是项目中的一个关键组成部分。HTTP协议是半双工的协议,同一时刻,只能有一个方向的数据传输,并且消息体冗长,不适用于即时通信系统。WebSocket提供了一种浏览器与服务器间进行全双工通信的网络技术,浏览器与服务器之间只需要做一个握手动作,之后就形成了一条快速通道,两者可以互相传输数据。WebSocket是基于TCP全双工进行消息传递,相比于HTTP半双工,性能得到很大的提升。系统基于Netty实现了WebSocket连接,用于用户之间的实时互动,包括聊天、点赞和送礼功能。该模块使得用户能够在红包活动页面下进行多种互动操作,从而提升用户参与度和用户体验。

在技术选型时,我们选择Netty,因为Netty 中有一个EventLoop 的概念,用于处理 Channel 上的事件和任务,负责监听网络事件并调用事件处理器进行相关I/O操作。每个 Channel 都会绑定一个 EventLoop,一个 EventLoop 可以处理多个 Channel 上的事件和任务。

在技术实现上,首先通过WebSocketServerProtocolHandler将HTTP协议升级为WebSocket协议,以支持WebSocket通信。

pipeline结构是一个带有head与tail指针的双向链表,其中的节点为handler,要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler,当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止,当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止。它的数据结构和调用顺序如下图所示:

我们在 pipeline 中自定义了一个WebSocketHandler处理类,用于处理WebSocket连接和消息的收发。此外,还通过二次编解码器,处理粘包与拆包。主要功能包括:

维护用户通道

WebSocketHandler维护一个用户通道管理集合,每个连接的用户都有一个唯一的通道ID,这些通道ID,与红包ID相关联,与用户通道对象进行映射。当用户连接或断开连接时,会触发相应的事件。这有助于维护通道的活跃性和清除无效的通道。通过这种方式,服务器可以轻松地将消息发送给参与特定红包活动的所有用户,实现了基于红包ID的用户通道管理。此外,异常处理的回调方法确保在通信过程中出现异常时,通道能够被正确地关闭和管理,同时记录异常信息以便进行适当的故障诊断和处理。这对于保持系统的稳定性和可靠性至关重要。核心代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
		/**
     * 用户通道管理
     * key:通道 id
     * value:通道对象
     */
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);
    /**
     * 红包频道管理
     * key: 红包 id
     * value: 通道 id 的 set
     */
    public static ConcurrentHashMap<Long, ConcurrentHashSet<String>> redChannels = new ConcurrentHashMap<>();
    /**
     * 通道就绪事件
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 放入通道组
        channelMap.put(channel.id().asLongText(), channel);
        channel.writeAndFlush(new TextWebSocketFrame(channel + "上线了"));
        log.info("[" + channelMap.size() + "]有新的连接:" + channel.remoteAddress());
    }
    /**
     * 通道未就绪事件
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        log.info("通道:" + channel.remoteAddress() + "已下线");
        // 当有客户端断开连接的时候,就移除对应的通道
        channelMap.remove(channel.id().asLongText());
        ctx.close();
}
    /**
     * 异常处理事件
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        //移除集合
        channelMap.remove(channel.id().asLongText());
        ctx.close();
        log.error("异常信息:{}", cause.getMessage());
    }

消息处理

WebSocketHandler支持不同类型的消息发送,包括欢迎消息、指令消息(点赞和送礼)和聊天消息。根据不同的消息类型,服务器可以向用户发送个性化的消息内容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 消息处理
  /**
     * 发送消息
     *
     * @param redId           红包 id
     * @param responseMessage 消息内容
     */
    public static void sendMessage(Long redId, String responseMessage) {
        for (String channel : redChannels.getOrDefault(redId, new ConcurrentHashSet<>())) {
            Channel c = channelMap.get(channel);
            if (c != null && c.isActive() && c.isWritable()) {
                c.writeAndFlush(new TextWebSocketFrame(responseMessage));
                log.info("发送消息:" + responseMessage);
            }
        }
    }

小坑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    @Value("${netty.port}")
    private static int port;

    @PostConstruct
    public void init(){
        try {
            new NettyServer(port).start();
            log.info("netty start...");
        }catch (Exception e){
           log.error(e.getMessage());
        }
    }

报错:Parameter 0 of constructor in com.project.modules.netty.NettyServer required a bean of type 'int' that could not be found.

原因:变量加了 static,在实例化之后才被执行;同理,加 final 不能再更改,也会出问题。

解决方案:删除 static。

最终实现

server 类,启动 websocket 服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.project.modules.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * Netty 实现 websocket
 */
@Component
@Slf4j
public class NettyWebSocketServer implements ApplicationListener<ContextRefreshedEvent> {

    private final int port = 9702;

    @Autowired
    private WebSocketChannelInit webSocketChannelInit;
    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
     * 容器初始化完成后调用
     *
     * @param contextRefreshedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // 启动netty服务器
        this.init();
    }

    public void init() {
        try {
            //1.创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //2.设置线程组
            serverBootstrap.group(bossGroup, workerGroup);
            //3.设置参数
            serverBootstrap.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(webSocketChannelInit);
            //4.启动  绑定端口不能和服务端口一致
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
            log.info(NettyWebSocketServer.class + " 启动正在监听: " + channelFuture.channel().localAddress());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

pipelines 设置各种 handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.project.modules.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 通道初始化对象
 */
@Component
@Slf4j
public class WebSocketChannelInit extends ChannelInitializer {
    @Autowired
    private WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(Channel channel) throws Exception {
        log.info("收到新连接");
        ChannelPipeline pipeline = channel.pipeline();
        // websocket本身是基于http协议的,对http协议的支持.
        pipeline.addLast(new HttpServerCodec());
        // 对大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //post请求分三部分. request line / request header / message body
        // HttpObjectAggregator将多个信息转化成单一的request或者response对象
        pipeline.addLast(new HttpObjectAggregator(8192));
        // 将http协议升级为ws协议. websocket的支持
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
        // 自定义处理handler
        pipeline.addLast(webSocketHandler);
    }
}

handler,处理通道和消息

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.project.modules.netty;

import cn.hutool.core.collection.ConcurrentHashSet;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 自定义处理类
 */
@Component
@ChannelHandler.Sharable
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 用户通道管理
     * key:通道 id
     * value:通道对象
     */
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(16);

    /**
     * 红包频道管理
     * key: 红包 id
     * value: 通道 id 的 set
     */
    public static ConcurrentHashMap<Long, ConcurrentHashSet<String>> redChannels = new ConcurrentHashMap<>();

    /**
     * 发送消息
     *
     * @param redId           红包 id
     * @param responseMessage 消息内容
     */
    public static void sendMessage(Long redId, String responseMessage) {
        for (String channel : redChannels.getOrDefault(redId, new ConcurrentHashSet<>())) {
            Channel c = channelMap.get(channel);
            if (c != null && c.isActive() && c.isWritable()) {
                c.writeAndFlush(new TextWebSocketFrame(responseMessage));
                log.info("发送消息:" + responseMessage);
            }
        }
    }

    /**
     * 收到消息事件
     *
     * @param ctx 通道处理程序上下文
     * @param msg 文本框架网络套接字
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        log.info("msg:" + msg.text());
        String[] params = msg.text().split("###");
        if (params.length < 2) {
            log.error("消息格式错误" + msg.text());
            return;
        }
        String channelId = ctx.channel().id().asLongText();
        Long redId = Long.valueOf(params[0]);
        // 更新红包频道中的通道
        ConcurrentHashSet<String> channelIds = redChannels.getOrDefault(redId, new ConcurrentHashSet<>());
        channelIds.add(channelId);
        redChannels.put(redId, channelIds);

        String userName = params[1];
        if (params.length == 2) {
            // xxx来了
            sendMessage(redId, "@welcome###" + userName);
            return;
        }
        String message = params[2];
        if (message.startsWith("@")) {
            // 指令消息
            sendMessage(redId, message + "###" + userName);
            return;
        }
        // 群发聊天消息
        sendMessage(redId, userName + "###" + message);
    }

    /**
     * 通道就绪事件
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 放入通道组
        channelMap.put(channel.id().asLongText(), channel);
        channel.writeAndFlush(new TextWebSocketFrame(channel + "上线了"));
        log.info("[" + channelMap.size() + "]有新的连接:" + channel.remoteAddress());
    }

    /**
     * 通道未就绪事件
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        log.info("通道:" + channel.remoteAddress() + "已下线");
        // 当有客户端断开连接的时候,就移除对应的通道
        channelMap.remove(channel.id().asLongText());
        ctx.close();
    }

    /**
     * 异常处理事件
     *
     * @param ctx   ctx
     * @param cause 导致
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        //移除集合
        channelMap.remove(channel.id().asLongText());
        ctx.close();
        log.error("异常信息:{}", cause.getMessage());
    }
}

定时任务模块

根据模块划分,详细实现 定时任务与通知模块是项目中的一个关键组件,主要用于管理红包活动的不同阶段,并向用户和企业发送相应的通知。在项目中,定时任务分为以下几个阶段:

  1. 通知用户抢红包活动即将开始:在抢红包活动开始前的10分钟,通过邮件和短信通知用户,提醒他们即将有机会参与红包活动。
  2. 抢红包活动开始后的通知:在抢红包活动正式开始时,通过定时任务通知用户可以开始抢红包了。
  3. 抢红包结束后的结算:当抢红包活动结束后,触发定时任务执行后台结算方法。
  4. 通知企业红包退款:在结算完毕后,通过邮件通知企业红包已经退回了部分金额。

在技术实现上,采用了Quartz框架来实现定时任务的管理和调度。Quartz提供了灵活的任务调度功能,允许定义任务的执行时间、频率和触发器等属性。

首先,使用 JobBuilder.newJob(StartJob.class) 创建一个 JobDetail 对象,用于定义任务。redId 作为任务数据的参数传递。然后,使用 TriggerBuilder.newTrigger() 创建一个 Trigger 对象,并自定义任务的执行时间。最后,将 Trigger 关联到 JobDetail,并进行调度执行。

SQL

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#
#
# By: Ron Cordell - roncordell
#  I didn't see this anywhere, so I thought I'd post it here. This is the script from Quartz to create the tables in a MySQL database, modified to use INNODB instead of MYISAM.

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(190) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(190) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(190) NULL,
JOB_GROUP VARCHAR(190) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;

CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);

CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);

CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);

commit;

常用的表

  • qrtz_cron_triggers 存放cron类型的触发器
  • qrtz_job_details 存放jobDetail信息
  • qrtz_simple_triggers 存放简单类型的触发器
  • qrtz_simprop_triggers 存放CalendarIntervalTrigger和DailyTimeIntervalTrigger类型的触发器
  • qrtz_triggers 存放触发器基本信息
  • qrtz_locks 存放锁信息
  • qrtz_scheduler_state 存放调度器状态

yml 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 spring:
  quartz:
    #相关属性配置
    properties:
      org:
        quartz:
          scheduler:
            instanceName: clusteredScheduler
            instanceId: AUTO
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: QRTZ_
            isClustered: true
            clusterCheckinInterval: 10000
            useProperties: false
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 15
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true
    #数据库方式
    job-store-type: jdbc

核心代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 定时任务和消息通知模块
    public void scheduleStartAndEnd(Long redId, Date startTime, Date endTime) {
        try {
            scheduleStart(redId, startTime);
            // 留一定时间给用户传输时延
            scheduleEnd(redId, DateUtil.offsetSecond(endTime, waitUserOffset));
        } catch (Exception e) {
            log.error(e.getMessage());
        }
}
    private void scheduleStart(Long redId, Date time) {
        log.info("创建定时任务-通知开始抢红包" + redId + time);
        // 创建一个JobDetail,用于定义任务
        JobDetail jobDetail = JobBuilder.newJob(StartJob.class)
                .usingJobData("redId", redId)
                .build();
        // 创建一个Trigger,定义任务的执行时间
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(redId + START)
                .startAt(time)
                .build();
        // 调度任务
        try {
            scheduler.scheduleJob(jobDetail, trigger);
            TriggerKey triggerKey = new TriggerKey(redId + START, redId + START);
            log.info(scheduler.getTriggerState(triggerKey).name());
        } catch (SchedulerException e) {
            log.error("【任务执行异常】" + e.getMessage());
        }
}
    private void scheduleEnd(Long redId, Date time) throws SchedulerException {
        log.info("创建定时任务-结算红包" + time);

        // 创建一个JobDetail,用于定义任务
        JobDetail jobDetail = JobBuilder.newJob(EndJob.class)
                .usingJobData("redId", redId)
                .build();
        // 创建一个Trigger,定义任务的执行时间
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(redId + "end")
                .startAt(time)
                .build();
        // 调度任务
        scheduler.scheduleJob(jobDetail, trigger);
    }

亮点

  1. 在即时互动模块,使用WebSocket协议和Netty框架可以显著提高性能、可扩展性和用户体验。采用 websocket 协议进行全双工通信,比 http 协议具有更低的延迟,并且节约了带宽。Netty 使用异步和事件驱动的编程模型,这意味着它可以处理大量的并发连接而不会阻塞主线程。此外,Netty 还提供了一些安全性特性,可以帮助WebSocket连接免受各种网络攻击,例如DDoS攻击和WebSocket协议的安全漏洞。

  2. 对于抢红包这一核心业务,我们既需要缓存来实现低延迟,又需要保证缓存服务的可靠性,于是选择采用Redis 集群这一技术方案。哨兵机制是实现 Redis 不间断服务的重要保证。具体来说,主从集群的数据同步,是数据可靠的基础保证;而在主库发生故障时,自动的主从切换是服务不间断的关键支撑。Redis 的哨兵机制自动完成了监控、选主、通知这三大功能,从而实现了主从库的自动切换,可以降低 Redis 集群的运维开销:

    1. 监控主库运行状态,并判断主库是否客观下线;
    2. 在主库客观下线后,选取新主库;
    3. 选出新主库后,通知从库和客户端。

    为了降低误判率,在实际应用时,哨兵机制通常采用多实例的方式进行部署,多个哨兵实例通过“少数服从多数”的原则,来判断主库是否客观下线。一般来说,我们可以部署三个哨兵,如果有两个哨兵认定主库“主观下线”,就可以开始切换过程。

0%