nginx lua集成kafka的实现方法

2019-10-17 16:33:17刘景俊

这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:这里的 kafka 这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包

我们看看kafka里面多有哪些包:

[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root  710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上 kafka 集成包:kafka_jb51.rar

第二步:创建kafka测试lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.创建测试文件

[root@node03 openresty]# mkdir -r testlua
#这里文件名自己取,文件位置自己定,但必须找得到

这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!

3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件

创建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')

--数据采集阈值限制,如果lua采集超过阈值,则不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分区数
local PARTITION_NUM = 6
-- kafka主题名称
local TOPIC = 'B2CDATA_COLLECTION1'
-- 轮询器共享变量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
  return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
  pollingVal = 1
  shared_data:set(POLLING_KEY, pollingVal)
end
--获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)

-- 并发控制
local isGone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  isGone = false
end
-- 数据采集
if isGone then

  local time_local = ngx.var.time_local
  if time_local == nil then
    time_local = ""
  end

  local request = ngx.var.request
  if request == nil then
    request = ""
  end

  local request_method = ngx.var.request_method
  if request_method == nil then
    request_method = ""
  end

  local content_type = ngx.var.content_type
  if content_type == nil then
    content_type = ""
  end
  ngx.req.read_body()
  local request_body = ngx.var.request_body
  if request_body == nil then
    request_body = ""
  end

  local http_referer = ngx.var.http_referer
  if http_referer == nil then
    http_referer = ""
  end

  local remote_addr = ngx.var.remote_addr
  if remote_addr == nil then
    remote_addr = ""
  end

  local http_user_agent = ngx.var.http_user_agent
  if http_user_agent == nil then
    http_user_agent = ""
  end

  local time_iso8601 = ngx.var.time_iso8601
  if time_iso8601 == nil then
    time_iso8601 = ""
  end

  local server_addr = ngx.var.server_addr
  if server_addr == nil then
    server_addr = ""
  end

  local http_cookie = ngx.var.http_cookie
  if http_cookie == nil then
    http_cookie = ""
  end
--封装数据
  local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--发送数据
local ok, err = bp:send(TOPIC, partitions, message)
--打印错误日志
  if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
  end
end