KAFKA消息拉取接口

前置说明

本接口接入前需要与@小蓝本 对接人员确定AccessID及AccessToken信息。

如数据块需要加密的,则需要与对接人员确定密钥,该系列接口仅支持AES加密算法,算法代码请@对接人员

接口说明

kafka消息提取接口请求时,需要在请求头中增加AccessID及AccessToken,用于权限校验。

在请求头中增加X-Tracking-ID参数,传入请求的唯一ID标识,可方便联调确定错误原因用于问题的反馈

如有请求BODY块的或响应信息体的,均为UTF-8的JSON

接口的http响应码为200~299之间的均为操作成功,其它的均为错误

{
  "trackingId": "dcfd388b-297a-42ae-b39d-9e413b04977a", //响应错误跟踪ID,用于问题的反馈
  "errors": [  //具体的错误信息块
    {
      "code": 403,   //详细错误码
      "message": "验签错误."   //错误的内容描述
    }
  ]
}

接口基础URL

https://api.xiaolanben.com

拉取消息接口

GET /blue-entity/api/v1/open/kafka/pull

注意:本接口数据返回的type如果是insert/update时,为防止数据重复消费问题,建议数据库的sql使用冲突更新的方式,mysql示例(user_id为主键):

insert into table(user_id, user_name, age) values(1, '张三', 18),(2, '李四', 19) on duplicate key update user_name=values(user_value), age=values(age);

如果遇到type为invalid类型则,直接跳过当前记录,但在上报offset时,需要上报。

如果拉取到的结果列表数量小于请求时的pull_size时,建议等待1分钟(至少等待1秒)后再次请求。

如果在1分钟内拉取请求数与设置offset接口请求数比大于2:1且拉取次数大于10次的,系统则会丢出异常保护无效的拉取(应是拉取代码有问题,导致未能正常处理)

请求参数

字段 数据类型 数据域 描述
topic String query 必传。访问kafka的topic,请求前@对接人员确定
pull_size Int query 必传。本次拉取的记录数,最小为1,最大为500
partition Int query 非必传。本次拉取的分区ID,请求前@对接人员确定

响应参数

http响应码200

字段 数据类型 描述
[] Array 响应为json数组,内部结构见下表
– type String 操作类型 insert/update/delete/invalid
– data String Json记录内容,如约定加密的,则为加密后的内容
– ts Long 数据记录变更时间戳,到ms
– partition Int 数据记录分区
– offset Long 记录的offset,回传设置offset时使用

CURL示例

curl -X GET --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull?topic=all_ent_info&pull_size=100'

设置offset接口

POST /blue-entity/api/v1/open/kafka/offset

请求参数

字段 数据类型 数据域 描述
topic String body 必传。访问kafka的topic,请求前@对接人员确定
partition Int body 非必传。本次设置的分区ID,请求前@对接人员确定
offset Long body 必传。设置的offset值,参考拉取接口返回列表中的最大值

响应码

http响应码204。无响应体

CURL示例

curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \ 
   "offset": 100, \ 
   "partition": 0, \ 
   "topic": "all_ent_info" \ 
 }' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset'

设置从头开始消费接口

POST /blue-entity/api/v1/open/kafka/offset/first

请求参数

字段 数据类型 数据域 描述
topic String body 必传。访问kafka的topic,请求前@对接人员确定
partition Int body 非必传。本次设置的分区ID,请求前@对接人员确定

响应码

http响应码200

字段 数据类型 描述
firstOffset Long 起始的数据位

CURL示例

curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \ 
   "partition": 0, \ 
   "topic": "all_ent_info" \ 
 }' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first'

设置偏移量至最后

POST /blue-entity/api/v1/open/kafka/offset/end

请求参数

字段 数据类型 数据域 描述
topic String body 必传。访问kafka的topic,请求前@对接人员确定
partition Int body 非必传。本次设置的分区ID,请求前@对接人员确定

响应码

http响应码200

字段 数据类型 描述
lastOffset Long 最后的数据位

CURL示例

curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' -d '{ \ 
   "partition": 0, \ 
   "topic": "all_ent_info" \ 
 }' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end'

获取当前消费偏移量

GET/blue-entity/api/v1/open/kafka/offset

请求参数

字段 数据类型 数据域 描述
topic String query 必传。访问kafka的topic,请求前@对接人员确定
partition Int query 非必传。本次设置的分区ID,请求前@对接人员确定

响应码

http响应码200

字段 数据类型 描述
offset Long 当前的数据位

CURL示例

curl -X GET --header 'Accept: application/json' --header 'AccessToken: 1234567890' --header 'AccessID: xlb' 'https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset?topic=all_ent_info'

JAVA DEMO

KafkaPullDemo

PY代码示例

# -*- coding: utf-8 -*-
import requests, json, re, time, sys, urllib

requests.adapters.DEFAULT_RETRIES = 5

batch_size = 500
ACCESS_ID = '你的access_id'
ACCESS_TOKEN = '你的access_token'
get_headers = {
  'AccessID': ACCESS_ID,
  'AccessToken': ACCESS_TOKEN,
  # 'Connection': 'close'
}
post_headers = {
  'AccessID': ACCESS_ID,
  'AccessToken': ACCESS_TOKEN,
  # 'Connection': 'close',
  'content-type': 'application/json'
}

def assert_resp(resp):
   if resp.status_code>=300:
      raise Exception(f"调用接口出错{resp.status_code} -- {resp.content.decode()}")
      
def do_pull(topic, partition):
  #拉取指定的topic数据
  payload = {
    'topic': topic, 
    'partition': str(partition), 
    'pull_size': str(batch_size)
  }
  params = urllib.parse.urlencode(payload, safe=':+')
  try:
    resp = requests.get("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull", params=params, headers=get_headers, timeout=10)
  except:
    resp = requests.get("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/pull", params=params, headers=get_headers, timeout=10)
  assert_resp(resp)
  data_list = json.loads(resp.content)
  print(f"获取到{topic}-{partition}共{len(data_list)}条")
  return data_list

def set_offset(topic, partition, offset):
  #设置offset
  payload = {
    'topic': topic, 
    'partition': str(partition),
    'offset': offset
  }
  data = json.dumps(payload)
  try:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset", data=data, headers=post_headers, timeout=10)
  except:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset", data=data, headers=post_headers, timeout=10)
  assert_resp(resp)
  print(f"设置{topic}-{partition}的offset为{offset}")

def set_first(topic, partition):
  #设置offset为开始位
  payload = {
    'topic': topic, 
    'partition': str(partition)
  }
  data = json.dumps(payload)
  try:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first", data=data, headers=post_headers, timeout=10)
  except:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/first", data=data, headers=post_headers, timeout=10)
  assert_resp(resp)
  resp_map = json.loads(resp.content)
  print(f"获取到{topic}-{partition}的头部offset为{resp_map['firstOffset']}")

def set_last(topic, partition):
  #设置offset为开始位
  payload = {
    'topic': topic, 
    'partition': str(partition)
  }
  data = json.dumps(payload)
  try:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end", data=data, headers=post_headers, timeout=10)
  except:
    resp = requests.post("https://api.xiaolanben.com/blue-entity/api/v1/open/kafka/offset/end", data=data, headers=post_headers, timeout=10)
  assert_resp(resp)
  resp_map = json.loads(resp.content)
  print(f"获取到{topic}-{partition}的尾部offset为{resp_map['lastOffset']}")

def batch_pull(topic, partition):
  #循环消费
  uk_set = set()  #我这只是用来存贮uk列,判断数量 
  empty_times = 0
  while(True):
    data_list = do_pull(topic, partition)
    if len(data_list) == 0:
        if empty_times>3:
          # 连续拉取3次都是空的,再退出,有时可能会拉取不到数据
          break
        empty_times = empty_times + 1
        continue
    empty_times = 0 #如果有拉取到数据,则重置 
    max_offset = 0
    for item in data_list:
      max_offset = max(max_offset, item['offset'])
      item_type = item['type']
      data = json.loads(item['data'])
      uk = data['uk']
      if 'delete' == item_type:
        if uk in uk_set:
          uk_set.remove(uk)
      else:
          uk_set.add(uk)
    #你需要在此批量入库你的数据,处理你的业务逻辑
    set_offset(topic, partition, max_offset)
  print(f"消费{topic}-{partition}完成, 共消费到{len(uk_set)}条")

def process(argv):
  topic = '你的topic'
  partition = 0
  op = argv[1]
  if op == 'pull':
      do_pull(topic, partition)
      return
  if op == 'first':
     set_first(topic, partition)
     return
  if op == 'last':
     set_last(topic, partition)
     return
  if op == 'batch':
     batch_pull(topic, partition)
     return
  print(f"错误的操作码{op}")
   
if __name__ == "__main__":
    if len(sys.argv) < 2:
        raise Exception("请输入操作 pull/first/last/batch")
    process(sys.argv)