通知与订阅
这里介绍一下通知与订阅的使用。
通知功能介绍
支持有:
整个模块基于责任链来实现。
为了能统一实现便于扩展,提供配置表messsage_template
来实现发送渠道的维护。如下图:
notify_biz_type,notify_biz_sub_type 字段是关键。通过它来进行业务区分。发送渠道则由 app_push/wx_ma_push 等几个 push 结尾的字段控制。
先来看下公共核心代码实现:
java
/**
* 消息推送应用实现类
*
* @author mjyang
* @date 2023/6/27 14:45
*/
@Service
@RequiredArgsConstructor
public class MessagePushServiceImpl implements MessagePushService {
@Resource
private AbstractChainContext<MessagePushContextDto> chain;
private final MessageTemplateRepository messageTemplateRepository;
@Override
public void push(MessagePushDto dto) {
MessageTemplate messageTemplate = this.messageTemplateRepository.get(dto.getNotifyBizType(), dto.getNotifyBizSubType());
MessagePushContextDto contextDto = new MessagePushContextDto(
messageTemplate.getCode(),
messageTemplate.getSign(),
messageTemplate.getSmsSign(),
messageTemplate.getName(),
messageTemplate.getNotifyBizType(),
messageTemplate.getNotifyBizSubType(),
messageTemplate.getAppContent(),
messageTemplate.getWxMaContent(),
messageTemplate.getSmsContent(),
messageTemplate.getPage(),
dto.getPageParams(),
dto.getContentParams(),
dto.getReceiverIdList(),
messageTemplate.getWxMaPush(),
messageTemplate.getEmailMessagePush(),
messageTemplate.getSmsMessagePush(),
MapUtil.builder("miniProgramState", messageTemplate.getWxPageEnv())
.put("autoLookupOpenId", Optional.ofNullable(dto.getAutoLookupOpenId()).orElse(Boolean.FALSE).toString())
.build()
);
this.chain.handler(MessagePushType.getMessagePushChainKey(), contextDto);
}
}
java
/**
* 消息推送责任链上下文
*
* @author mjyang
* @date 2023/6/27 14:46
*/
@AllArgsConstructor
@Getter
@ToString
public class MessagePushContextDto implements Serializable {
private static final long serialVersionUID = 1L;
private String code;
/**
* 模板签名1,其他类型使用
*/
private String sign;
/**
* 模板签名2,sms 推送使用
*/
private String smsSign;
/**
* 模板名称
*/
private String name;
/**
* 业务通知类型。{@link NotifyBizType}
*/
private String notifyBizType;
/**
* 业务通知子类型。{@link NotifyBizSubType}
*/
private String notifyBizSubType;
/**
* 站内消息,app 推送使用
*/
private String appContent;
/**
* 微信小程序 推送使用
*/
private String wxMaContent;
/**
* 短信内容
*/
private String smsContent;
/**
* 页面地址
*/
private String page;
/**
* 页面跳转需要参数时指定
*/
private List<Object> pageParams;
/**
* 内容通知参数
*/
private List<Object> contentParams;
/**
* 接收人 id 列表。当微信小程序通知时,为 openId(如果 autoLookupOpenId 设为 true,则该属性可为用户id)。邮件时则为邮箱,短信发送时为手机号。
*/
private final List<String> receiverIdList;
/**
* 微信小程序发送
*/
private Boolean wxMaPush;
/**
* 邮件发送
*/
private Boolean emailPush;
/**
* 短信发送
*/
private Boolean smsPush;
/**
* 扩展参数
*/
private Map<String, String> extendParams;
}
java
/**
* 消息推送模型
*
* @author mjyang
* @date 2023/6/27 14:46
*/
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class MessagePushDto implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 业务通知类型。{@link NotifyBizType}
*/
private String notifyBizType;
/**
* 业务通知子类型。{@link NotifyBizSubType}
*/
private String notifyBizSubType;
/**
* 页面跳转需要参数时指定
*/
private List<Object> pageParams;
/**
* 渠道通知参数
*/
private List<Object> contentParams;
/**
* 接收人 id 列表。当微信小程序通知时,为 openId(如果 autoLookupOpenId 设为 true,则该属性可为用户id)。邮件时则为邮箱,短信发送时为手机号。
*/
private List<String> receiverIdList;
/**
* 是否自动寻找 openid
*/
private Boolean autoLookupOpenId;
}
java
/**
* 业务通知类型
*
* @author mjyang
* @date 2023/6/26 11:23
*/
public enum NotifyBizType {
/**
* 订单
*/
ORDER,
/**
* 商品
*/
GOODS,
/**
* 优惠券
*/
COUPON,
/**
* 会员
*/
MEMBER,
/**
* 权益
*/
RIGHTS
}
java
/**
* 业务通知子类型
*
* @author mjyang
* @date 2023/6/26 11:23
*/
@Getter
@AllArgsConstructor
public enum NotifyBizSubType implements EnumValue {
/**
* 订单待付款通知
*/
ORDER_ORDERED_NOTICE("订单待付款通知", 1),
/**
* 订单发货通知
*/
ORDER_SHIPPED_NOTICE("订单发货通知", 2),
/**
* 商品到货通知
*/
GOODS_ARRIVAL_REMINDERS_NOTICE("商品到货通知", 3),
/**
* 券过期提醒
*/
COUPON_OVERDUE_NOTICE("券过期提醒", 4),
/**
* 邀请成功通知
*/
MEMBER_VISIT_NOTICE("邀请成功通知", 5),
/**
* 收益到账提醒
*/
RIGHTS_ARRIVED_NOTICE("收益到账提醒", 6),
/**
* 奖励领取通知
*/
RIGHTS_RECEIVE_NOTICE("奖励领取通知", 7);
private final String key;
private final Integer value;
public static NotifyBizSubType getInstance(String name) {
NotifyBizSubType notifyBizSubType = null;
for (NotifyBizSubType value : values()) {
if (value.name().equals(name)) {
notifyBizSubType = value;
break;
}
}
Assert.notNull(notifyBizSubType, "找不到匹配的业务通知子类型:" + name);
return notifyBizSubType;
}
}
短信
配置一条短信发送模板。那我们需要这么几个元素:
- 发送业务场景(notify_biz_type/notify_biz_sub_type)
- 模板编号(code)
- 模板参数(content2)变量形式
{"url":""}
- 开启短信推送配置(sms_push)
看下核心代码实现:
java
/**
* sms 消息推送
*
* @author mjyang
* @date 2023/6/27 14:45
*/
@Slf4j
@Service
public class SmsMessagePushHandler implements MessagePushHandler {
@Resource
private SmsSendHandler smsSendHandler;
@Override
public void handler(MessagePushContextDto context) {
if (Boolean.FALSE.equals(context.getSmsPush()) || CollUtil.isEmpty(context.getReceiverIdList())) {
return;
}
try {
for (String mobile : context.getReceiverIdList()) {
if (CollUtil.isEmpty(context.getContentParams())) {
log.error("短信发送参数为空,无法发送,参数:" + context.toString());
return;
}
JSONObject jsonObject = JSON.parseObject(context.getSmsContent(), Feature.OrderedField);
int i = 0;
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
entry.setValue(context.getContentParams().get(i));
i++;
}
smsSendHandler.sendMessage(context.getCode(), mobile, jsonObject.getInnerMap(), context.getSmsSign());
}
} catch (IllegalArgumentException ex) {
// ignore,内部已记录异常,不在重复记录
}
}
@Override
public int getOrder() {
return MessagePushType.SMS.getValue();
}
}
短信配置支持两种形式配置:
- 统一的
message_template
数据表 - 外部配置文件,基于场景值,如下
yaml
wmeimob:
sms:
smsType: aliyun
accessKeyId: xx
accessKeySecret: xxx
configs: #场景值(根据业务场景自定义)
CHANGE_MOBILE:
sign: xxx
template-code: xxx
timeout: 60
expire: 60
length: 6
enabledTest: false
邮箱
配置一条邮箱发送模板。那我们需要这么几个元素:
- 发送业务场景(notify_biz_type/notify_biz_sub_type)
- 模板内容(content3)
- 邮箱发送账号(配置文件)
- 开启邮箱推送配置(email_push)
看下核心代码实现:
java
/**
* 邮件消息推送服务
*
* @author mjyang
* @date 2024/4/15 10:32
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EmailMessagePushHandler implements MessagePushHandler {
@Override
public void handler(MessagePushContextDto context) {
if (Boolean.FALSE.equals(context.getEmailPush()) || CollUtil.isEmpty(context.getReceiverIdList())) {
return;
}
for (String email : context.getReceiverIdList()) {
String subject = context.getSign() + context.getName();
String content = StrUtil.format(context.getAppContent(), Optional.ofNullable(context.getContentParams())
.orElse(new ArrayList<>())
.toArray());
try {
MailUtil.send(email, subject, content, false);
} catch (MailException ex) {
log.error("邮件发送出现异常,参数:" + context.toString(), ex);
}
}
}
@Override
public int getOrder() {
return MessagePushType.Email.getValue();
}
}
txt
# 邮件服务器的SMTP地址,可选,默认为smtp.<发件人邮箱后缀>
host = xxx
# 邮件服务器的SMTP端口,可选,默认25
port = 465
# 发件人(必须正确,否则发送失败)
from = xxx
# 用户名,默认为发件人邮箱前缀
user = xxx
# 密码(注意,某些邮箱需要为SMTP服务单独设置授权码,详情查看相关帮助)
pass = xxxx
#使用 STARTTLS安全连接,STARTTLS是对纯文本通信协议的扩展。
starttlsEnable = true
# 使用SSL安全连接
sslEnable = true
# 指定实现javax.net.SocketFactory接口的类的名称,这个类将被用于创建SMTP的套接字
socketFactoryClass = javax.net.ssl.SSLSocketFactory
# 如果设置为true,未能创建一个套接字使用指定的套接字工厂类将导致使用java.net.Socket创建的套接字类, 默认值为true
socketFactoryFallback = true
# 指定的端口连接到在使用指定的套接字工厂。如果没有设置,将使用默认端口456
socketFactoryPort = 465
# SMTP超时时长,单位毫秒,缺省值不超时
timeout = 0
# Socket连接超时值,单位毫秒,缺省值不超时
connectionTimeout = 0
TIP
配置文件中 xxx 参数为需要替换修改的,对于单体服务来说,此配置文件可放置在 src/main/resources 下。对于 k8s 服务来说,可参考这里(证书文件如何配置)。如果配置了,MailUtil.send 需要传入配置的地址。
微信小程序订阅消息
配置一条小程序发送订阅模板。那我们需要这么几个元素:
- 发送业务场景(notify_biz_type/notify_biz_sub_type)
- 模板编号(code)
- 模板参数及是否有跳转页面(content1,page)
- 发送环境(wx_page_env)
- 开启微信推送配置(wx_ma_push)
那现在在messsage_template
里很容易就能维护上这块信息,如下图:
看下核心代码实现:
java
/**
* 消息推送小程序订阅消息处理器
*
* @author mjyang
* @date 2023/5/24 17:24
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class WxMaMessagePushHandler implements MessagePushHandler {
private final WxMaService wxMaService;
private final SubscriptionUtil subscriptionUtil;
/**
* 微信小程序订阅消息格式正则
*/
private final Pattern PATTERN = Pattern.compile("\\{\\{(.*?)\\}\\}");
private List<String> spiltContent(String text) {
Matcher matcher = PATTERN.matcher(text);
List<String> resultList = new ArrayList<>();
while (matcher.find()) {
String matchStr = matcher.group(1);
String[] attrs = matchStr.split("\\.");
resultList.add(attrs[0]);
}
return resultList;
}
@Override
public void handler(MessagePushContextDto context) {
if (Boolean.FALSE.equals(context.getWxMaPush()) || CollUtil.isEmpty(context.getReceiverIdList())) {
return;
}
boolean autoLookupOpenId = BooleanUtil.toBoolean(context.getExtendParams().get("autoLookupOpenId"));
if (autoLookupOpenId) {
NotifyBizSubType notifyBizSubType = NotifyBizSubType.getInstance(context.getNotifyBizSubType());
Map<Long, String> subscriptionUserMap = subscriptionUtil.getSubscriptionUserMap(notifyBizSubType.getValue(), "*");
if (MapUtil.isEmpty(subscriptionUserMap)) {
return;
}
Map<Long, String> matchedUserMap = MapUtil.newHashMap(subscriptionUserMap.size());
for (String userId : context.getReceiverIdList()) {
Long uid = Convert.toLong(userId);
String openId = subscriptionUserMap.get(uid);
if (StrUtil.isBlank(openId)) {
continue;
}
matchedUserMap.put(uid, openId);
sendSubscribeMsg(context, openId);
}
subscriptionUtil.removeSubscription(notifyBizSubType.getValue(), matchedUserMap);
} else {
for (String openId : context.getReceiverIdList()) {
sendSubscribeMsg(context, openId);
}
}
}
private void sendSubscribeMsg(MessagePushContextDto context, String openId) {
if (StrUtil.isBlank(openId)) {
return;
}
WxMaSubscribeService subscribeService = this.wxMaService.getSubscribeService();
WxMaSubscribeMessage wxMaSubscribeMessage = new WxMaSubscribeMessage();
wxMaSubscribeMessage.setToUser(openId);
wxMaSubscribeMessage.setTemplateId(context.getCode());
wxMaSubscribeMessage.setMiniprogramState(StrUtil.blankToDefault(context.getExtendParams().get("miniProgramState"), WxMaConstants.MiniProgramState.FORMAL));
if (StrUtil.isNotBlank(context.getPage())) {
if (CollUtil.isNotEmpty(context.getPageParams())) {
String page = StrUtil.format(context.getPage(), context.getPageParams().toArray());
wxMaSubscribeMessage.setPage(page);
} else {
wxMaSubscribeMessage.setPage(context.getPage());
}
}
List<String> templateDataList = this.spiltContent(context.getWxMaContent());
WxMaSubscribeMessage.MsgData wxMaSubscribeData;
for (int i = 0, j = templateDataList.size(); i < j; i++) {
wxMaSubscribeData = new WxMaSubscribeMessage.MsgData();
wxMaSubscribeData.setName(templateDataList.get(i));
String value = Optional.ofNullable(context.getContentParams().get(i)).orElse(StrUtil.EMPTY).toString();
wxMaSubscribeData.setValue(value);
wxMaSubscribeMessage.addData(wxMaSubscribeData);
}
try {
subscribeService.sendSubscribeMsg(wxMaSubscribeMessage);
} catch (WxErrorException e) {
log.error("微信小程序发送订阅消息出现异常,模板:" + context.getCode(), e);
}
}
@Override
public int getOrder() {
return MessagePushType.WX_MA.getValue();
}
}
模板参数会自动解析,页面跳转参数也是依赖变量自动赋值,open_id 这里有两个机制(一个是自动解析,另一个是手动传入)
那如何调用呢?
java
@Resource
private MessagePushService messagePushService;
private void sendOrderNotice() {
MessagePushDto messagePushDto = new MessagePushDto();
messagePushDto.setAutoLookupOpenId(Boolean.TRUE);
messagePushDto.setReceiverIdList(ListUtil.of(obj.getMemberId().toString()));
messagePushDto.setNotifyBizType(NotifyBizType.ORDER.name());
messagePushDto.setNotifyBizSubType(NotifyBizSubType.ORDER_SHIPPED_NOTICE.name());
List<Object> pageParams = new ArrayList<>();
pageParams.add(obj.getId());
messagePushDto.setPageParams(pageParams);
List<Object> contentParams = new ArrayList<>();
String goodsName = orderItemDtos.get(0).getGoodsName();
if (goodsName.length() > 20) {
goodsName = goodsName.substring(0, 19);
}
if (orderItemDtos.size()>IntegerConstant.ONE) {
goodsName = goodsName+ "等";
}
contentParams.add(goodsName);
contentParams.add(orderItemDtos.stream().mapToInt(OrderItemPO::getQuantity).sum());
contentParams.add(obj.getOrderNo());
contentParams.add(DateUtil.format(DateUtil.date(), "yyyy年MM月dd日"));
contentParams.add("宝贝已发货,点击查看物流信息");
messagePushDto.setContentParams(contentParams);
// 调用通知
messagePushService.push(messagePushDto);
}
订阅
java
/**
* 接收人 id 列表。当微信小程序通知时,为 openId(如果 autoLookupOpenId 设为 true,则该属性可为用户id)。邮件时则为邮箱,短信发送时为手机号。
*/
private List<String> receiverIdList;
/**
* 是否自动寻找 openid
*/
private Boolean autoLookupOpenId;
这里有个重要的概念。openid 自动寻找。当业务场景需要授权订阅触发通知时,便可以利用 openid 自动寻找机制。这背后得益于统一订阅逻辑的实现。
核心代码实现如下:
java
/**
* 公用订阅工具
*
* @author mjyang
* @date 2023/9/28 14:11
*/
@Component
@Slf4j
public class SubscriptionUtil {
/**
* 订阅用户类型
*/
private static final String SUBSCRIPTION = "SUBSCRIPTION:%s";
/**
* 订阅用户标识。用户id_openid
*/
private static final String SUBSCRIPTION_USER = "%s_%s";
@Resource
private DistributeCacheService distributeCacheService;
/**
* 获取对应类型的订阅信息
*
* @param type 订阅类型
* @param pattern 匹配模式。例如 用户id*
* @return map key:用户id value:openid
*/
public Map<Long, String> getSubscriptionUserMap(Integer type, String pattern) {
Map<Long, String> userMap = MapUtil.newHashMap();
ScanOptions scanOptions = ScanOptions.scanOptions().match(pattern).count(1000).build();
String cacheKey = String.format(SUBSCRIPTION, type);
try (Cursor<Map.Entry<Object, Object>> cursor = this.distributeCacheService.executeRtn(x -> x.opsForHash().scan(cacheKey, scanOptions))) {
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();
Object key = entry.getKey();
Long value = Convert.toLong(entry.getValue());
if (value <= 0) {
continue;
}
String[] userData = StrUtil.splitToArray(key.toString(), StrUtil.UNDERLINE);
userMap.put(Convert.toLong(userData[0]), userData[1]);
}
}
return userMap;
}
public void removeSubscription(Integer type, Map<Long, String> userMap) {
if (MapUtil.isEmpty(userMap)) {
return;
}
String cacheKey = String.format(SUBSCRIPTION, type);
this.distributeCacheService.execute(x -> x.executePipelined((RedisCallback<String>) connection -> {
userMap.forEach((uid, openId) -> {
connection.hIncrBy(cacheKey.getBytes(), String.format(SUBSCRIPTION_USER, uid, openId).getBytes(), -1);
});
return null;
}));
}
public void addSubscription(Integer type, Long userId, String openId) {
String hashKey = String.format(SUBSCRIPTION_USER, userId, openId);
String cacheKey = String.format(SUBSCRIPTION, type);
if (this.distributeCacheService.executeRtn(x -> x.opsForHash().hasKey(cacheKey, hashKey))) {
this.distributeCacheService.execute(x -> x.opsForHash().increment(cacheKey, hashKey, 1));
} else {
this.distributeCacheService.execute(x -> x.opsForHash().put(cacheKey, hashKey, "1"));
}
}
}
核心逻辑是利用 redis hash 结构来存储订阅对应业务类型的用户信息(多次订阅会累加次数)。当对应业务触发通知时,便可以去获取订阅对应类型的人来触发通知,同时减少订阅次数。