KAFKA消息推送标准文档

前置说明

本方案接入前需要与@小蓝本 相关人员确定方案

通过kafka消息推送方案,仅用于小蓝本将生产的业务数据推送到你方的kafka服务器集群中。因此你该需要先行准备kafka集群,并做好相应的网络安全策略后,将服务器的地址端口信息提供给@小蓝本 技术人员。

推送消息格式

小蓝本推送的数据消息的标准为每个数据表一个topic;每个表根据数据信息确定一个字段为唯一键,如uk;具体数据表结构信息,需要根据业务需求确定。

每次推送到kafka中的一条消息则代表一条数据记录变更;系统第一次推送时为全量方案,后续每日会做增量处理。

kafka中的json消息格式为:

字段 数据类型 备注
type String 操作类型 insert/delete
data String 对应表记录的json内容
ts Long 数据记录变更时间戳,到ms

数据示例

{"type":"insert","data":"{\"inv_pid\":\"p_k1631d59d3dc3e936b57478bcc658289d\",\"is_active\":\"Y\",\"ent_name\":\"郑州广宏企业管理有限公司\",\"uk\":\"00001cce3f50431df4446f4a448ce156\",\"inv_id\":\"inv_7db2c905a4827674771072c686186db3\",\"blic_type\":null,\"inv_type\":\"自然人股东\",\"ac_conam\":0.0,\"s_ext_timestamp\":\"2024-09-11 15:31:06\",\"inv_name_reg\":\"张照民\",\"currency_name\":null,\"sub_conam\":0.3,\"inv_name\":\"张照民\",\"inv_eid\":null,\"con_date\":\"2029-06-30\",\"eid\":\"qb4997d586ea411ef984200163e0ee983\",\"pid_fix\":null}","ts":1726113778896}

当type=insert时,则说明该表记录为新增或更新的记录,你收到后需要做upsert处理,如在mysql中的(user_id为约定的唯一键)逻辑为:

insert into table(user_id, user_name, age) values(1, '张三', 18),(2, '李四', 19) on duplicate key update user_name=values(user_value), age=values(age);

当type=delete时,则说明该表记录在小蓝本已经删除,你收到后需要做删除或逻辑删除处理