这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管
注意:这里的 kafka 这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包
我们看看kafka里面多有哪些包:
[root@node03 resty]# cd kafka [root@node03 kafka]# ll total 48 -rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua -rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua -rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua -rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua -rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua -rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua -rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua -rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua
附上 kafka 集成包:kafka_jb51.rar
第二步:创建kafka测试lua文件
1.退回到openresty
[root@node03 kafka]# cd /export/servers/openresty/
2.创建测试文件
[root@node03 openresty]# mkdir -r testlua #这里文件名自己取,文件位置自己定,但必须找得到
这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!
3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件
创建文件:vim kafkalua.lua或者touch kafkalua.lua
[root@node03 openresty]# cd testlua/ [root@node03 testlua]# ll total 8 -rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua
kafkalua.lua:
--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')
--数据采集阈值限制,如果lua采集超过阈值,则不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分区数
local PARTITION_NUM = 6
-- kafka主题名称
local TOPIC = 'B2CDATA_COLLECTION1'
-- 轮询器共享变量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
pollingVal = 1
shared_data:set(POLLING_KEY, pollingVal)
end
--获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)
-- 并发控制
local isGone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
isGone = false
end
-- 数据采集
if isGone then
local time_local = ngx.var.time_local
if time_local == nil then
time_local = ""
end
local request = ngx.var.request
if request == nil then
request = ""
end
local request_method = ngx.var.request_method
if request_method == nil then
request_method = ""
end
local content_type = ngx.var.content_type
if content_type == nil then
content_type = ""
end
ngx.req.read_body()
local request_body = ngx.var.request_body
if request_body == nil then
request_body = ""
end
local http_referer = ngx.var.http_referer
if http_referer == nil then
http_referer = ""
end
local remote_addr = ngx.var.remote_addr
if remote_addr == nil then
remote_addr = ""
end
local http_user_agent = ngx.var.http_user_agent
if http_user_agent == nil then
http_user_agent = ""
end
local time_iso8601 = ngx.var.time_iso8601
if time_iso8601 == nil then
time_iso8601 = ""
end
local server_addr = ngx.var.server_addr
if server_addr == nil then
server_addr = ""
end
local http_cookie = ngx.var.http_cookie
if http_cookie == nil then
http_cookie = ""
end
--封装数据
local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--发送数据
local ok, err = bp:send(TOPIC, partitions, message)
--打印错误日志
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
end








