前置说明
本接口接入前需要与@小蓝本 对接人员确定AccessID及AccessToken信息。
如数据块需要加密的,则需要与对接人员确定密钥,该系列接口仅支持AES加密算法,算法代码请@对接人员
接口说明
kafka消息提取接口请求时,需要在请求头中增加AccessID及AccessToken,用于权限校验。
在请求头中增加X-Tracking-ID参数,传入请求的唯一ID标识,可方便联调确定错误原因用于问题的反馈
如有请求BODY块的或响应信息体的,均为UTF-8的JSON
接口的http响应码为200~299之间的均为操作成功,其它的均为错误
{
"trackingId": "dcfd388b-297a-42ae-b39d-9e413b04977a", //响应错误跟踪ID,用于问题的反馈
"errors": [ //具体的错误信息块
{
"code": 403, //详细错误码
"message": "验签错误." //错误的内容描述
}
]
}
接口基础URL
https://api.xiaolanben.com
拉取消息接口
GET /blue-entity/api/v1/open/kafka/pull
注意:本接口数据返回的type如果是insert/update时,为防止数据重复消费问题,建议数据库的sql使用冲突更新的方式,mysql示例(user_id为主键):
insert into table(user_id, user_name, age) values(1, '张三', 18),(2, '李四', 19) on duplicate key update user_name=values(user_value), age=values(age);
如果遇到type为invalid类型则,直接跳过当前记录,但在上报offset时,需要上报。
如果拉取到的结果列表数量小于请求时的pull_size时,建议等待1分钟(至少等待1秒)后再次请求。
如果在1分钟内拉取请求数与设置offset接口请求数比大于2:1且拉取次数大于10次的,系统则会丢出异常保护无效的拉取(应是拉取代码有问题,导致未能正常处理)
请求参数
字段 | 数据类型 | 数据域 | 描述 |
topic | String | query | 必传。访问kafka的topic,请求前@对接人员确定 |
pull_size | Int | query | 必传。本次拉取的记录数,最小为1,最大为500 |
partition | Int | query | 非必传。本次拉取的分区ID,请求前@对接人员确定 |
响应参数
http响应码200
字段 | 数据类型 | 描述 |
[] | Array | 响应为json数组,内部结构见下表 |
– type | String | 操作类型 insert/update/delete/invalid |
– data | String | Json记录内容,如约定加密的,则为加密后的内容 |
– ts | Long | 数据记录变更时间戳,到ms |
– partition | Int | 数据记录分区 |
– offset | Long | 记录的offset,回传设置offset时使用 |
CURL示例
curl -X GET --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull?topic=all_ent_info&pull_size=100'
设置offset接口
POST /blue-entity/api/v1/open/kafka/offset
请求参数
字段 | 数据类型 | 数据域 | 描述 |
topic | String | body | 必传。访问kafka的topic,请求前@对接人员确定 |
partition | Int | body | 非必传。本次设置的分区ID,请求前@对接人员确定 |
offset | Long | body | 必传。设置的offset值,参考拉取接口返回列表中的最大值 |
响应码
http响应码204。无响应体
CURL示例
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \
"offset": 100, \
"partition": 0, \
"topic": "all_ent_info" \
}' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset'
设置从头开始消费接口
POST /blue-entity/api/v1/open/kafka/offset/first
请求参数
字段 | 数据类型 | 数据域 | 描述 |
topic | String | body | 必传。访问kafka的topic,请求前@对接人员确定 |
partition | Int | body | 非必传。本次设置的分区ID,请求前@对接人员确定 |
响应码
http响应码200
字段 | 数据类型 | 描述 |
firstOffset | Long | 起始的数据位 |
CURL示例
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \
"partition": 0, \
"topic": "all_ent_info" \
}' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first'
设置偏移量至最后
POST /blue-entity/api/v1/open/kafka/offset/end
请求参数
字段 | 数据类型 | 数据域 | 描述 |
topic | String | body | 必传。访问kafka的topic,请求前@对接人员确定 |
partition | Int | body | 非必传。本次设置的分区ID,请求前@对接人员确定 |
响应码
http响应码200
字段 | 数据类型 | 描述 |
lastOffset | Long | 最后的数据位 |
CURL示例
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \
"partition": 0, \
"topic": "all_ent_info" \
}' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end'
获取当前消费偏移量
GET/blue-entity/api/v1/open/kafka/offset
请求参数
字段 | 数据类型 | 数据域 | 描述 |
topic | String | query | 必传。访问kafka的topic,请求前@对接人员确定 |
partition | Int | query | 非必传。本次设置的分区ID,请求前@对接人员确定 |
响应码
http响应码200
字段 | 数据类型 | 描述 |
offset | Long | 当前的数据位 |
CURL示例
curl -X GET --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset?topic=all_ent_info'
JAVA DEMO
PY代码示例
# -*- coding: utf-8 -*-
import requests, json, re, time, sys, urllib
requests.adapters.DEFAULT_RETRIES = 5
batch_size = 500
ACCESS_ID = '你的access_id'
ACCESS_TOKEN = '你的access_token'
get_headers = {
'AccessID': ACCESS_ID,
'AccessToken': ACCESS_TOKEN,
# 'Connection': 'close'
}
post_headers = {
'AccessID': ACCESS_ID,
'AccessToken': ACCESS_TOKEN,
# 'Connection': 'close',
'content-type': 'application/json'
}
def assert_resp(resp):
if resp.status_code>=300:
raise Exception(f"调用接口出错{resp.status_code} -- {resp.content.decode()}")
def do_pull(topic, partition):
#拉取指定的topic数据
payload = {
'topic': topic,
'partition': str(partition),
'pull_size': str(batch_size)
}
params = urllib.parse.urlencode(payload, safe=':+')
try:
resp = requests.get("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull", params=params, headers=get_headers, timeout=10)
except:
resp = requests.get("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull", params=params, headers=get_headers, timeout=10)
assert_resp(resp)
data_list = json.loads(resp.content)
print(f"获取到{topic}-{partition}共{len(data_list)}条")
return data_list
def set_offset(topic, partition, offset):
#设置offset
payload = {
'topic': topic,
'partition': str(partition),
'offset': offset
}
data = json.dumps(payload)
try:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset", data=data, headers=post_headers, timeout=10)
except:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset", data=data, headers=post_headers, timeout=10)
assert_resp(resp)
print(f"设置{topic}-{partition}的offset为{offset}")
def set_first(topic, partition):
#设置offset为开始位
payload = {
'topic': topic,
'partition': str(partition)
}
data = json.dumps(payload)
try:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first", data=data, headers=post_headers, timeout=10)
except:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first", data=data, headers=post_headers, timeout=10)
assert_resp(resp)
resp_map = json.loads(resp.content)
print(f"获取到{topic}-{partition}的头部offset为{resp_map['firstOffset']}")
def set_last(topic, partition):
#设置offset为开始位
payload = {
'topic': topic,
'partition': str(partition)
}
data = json.dumps(payload)
try:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end", data=data, headers=post_headers, timeout=10)
except:
resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end", data=data, headers=post_headers, timeout=10)
assert_resp(resp)
resp_map = json.loads(resp.content)
print(f"获取到{topic}-{partition}的尾部offset为{resp_map['lastOffset']}")
def batch_pull(topic, partition):
#循环消费
uk_set = set() #我这只是用来存贮uk列,判断数量
empty_times = 0
while(True):
data_list = do_pull(topic, partition)
if len(data_list) == 0:
if empty_times>3:
# 连续拉取3次都是空的,再退出,有时可能会拉取不到数据
break
empty_times = empty_times + 1
continue
empty_times = 0 #如果有拉取到数据,则重置
max_offset = 0
for item in data_list:
max_offset = max(max_offset, item['offset'])
item_type = item['type']
data = json.loads(item['data'])
uk = data['uk']
if 'delete' == item_type:
if uk in uk_set:
uk_set.remove(uk)
else:
uk_set.add(uk)
#你需要在此批量入库你的数据,处理你的业务逻辑
set_offset(topic, partition, max_offset)
print(f"消费{topic}-{partition}完成, 共消费到{len(uk_set)}条")
def process(argv):
topic = '你的topic'
partition = 0
op = argv[1]
if op == 'pull':
do_pull(topic, partition)
return
if op == 'first':
set_first(topic, partition)
return
if op == 'last':
set_last(topic, partition)
return
if op == 'batch':
batch_pull(topic, partition)
return
print(f"错误的操作码{op}")
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception("请输入操作 pull/first/last/batch")
process(sys.argv)