diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index 92f563342..bae8fd3ed 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -11,17 +11,14 @@ telemetry: { enable: true } - -event_message: { - topics: [ - "$event/client_connected", - "$event/client_disconnected", - "$event/session_subscribed", - "$event/session_unsubscribed", - "$event/message_delivered", - "$event/message_acked", - "$event/message_dropped" - ] +event_message { + "$event/client_connected": true + "$event/client_disconnected": true + # "$event/client_subscribed": false + # "$event/client_unsubscribed": false + # "$event/message_delivered": false + # "$event/message_acked": false + # "$event/message_dropped": false } topic_metrics:{ diff --git a/apps/emqx_modules/include/emqx_modules.hrl b/apps/emqx_modules/include/emqx_modules.hrl index b7cdb154e..334173015 100644 --- a/apps/emqx_modules/include/emqx_modules.hrl +++ b/apps/emqx_modules/include/emqx_modules.hrl @@ -3,11 +3,3 @@ %% Interval for reporting telemetry data, Default: 7d -define(REPORT_INTERVAR, 604800). - --define(BASE_TOPICS, [<<"$event/client_connected">>, - <<"$event/client_disconnected">>, - <<"$event/session_subscribed">>, - <<"$event/session_unsubscribed">>, - <<"$event/message_delivered">>, - <<"$event/message_acked">>, - <<"$event/message_dropped">>]). diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index 5017ed3e4..842ddae04 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -20,14 +20,16 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_modules.hrl"). --export([ enable/0 +-export([ list/0 + , update/1 + , enable/0 , disable/0 ]). -export([ on_client_connected/2 , on_client_disconnected/3 - , on_session_subscribed/3 - , on_session_unsubscribed/3 + , on_client_subscribed/3 + , on_client_unsubscribed/3 , on_message_dropped/3 , on_message_delivered/2 , on_message_acked/2 @@ -37,51 +39,59 @@ -export([reason/1]). -endif. +list() -> + emqx:get_config([event_message], #{}). + +update(Params) -> + disable(), + {ok, _} = emqx:update_config([event_message], Params), + enable(). + enable() -> - Topics = emqx:get_config([event_message, topics], []), - lists:foreach(fun(Topic) -> + lists:foreach(fun({_Topic, false}) -> ok; + ({Topic, true}) -> case Topic of - <<"$event/client_connected">> -> + '$event/client_connected' -> emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}); - <<"$event/client_disconnected">> -> + '$event/client_disconnected' -> emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}); - <<"$event/session_subscribed">> -> - emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []}); - <<"$event/session_unsubscribed">> -> - emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []}); - <<"$event/message_delivered">> -> + '$event/client_subscribed' -> + emqx_hooks:put('session.subscribed', {?MODULE, on_client_subscribed, []}); + '$event/client_unsubscribed' -> + emqx_hooks:put('session.unsubscribed', {?MODULE, on_client_unsubscribed, []}); + '$event/message_delivered' -> emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}); - <<"$event/message_acked">> -> + '$event/message_acked' -> emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}); - <<"$event/message_dropped">> -> + '$event/message_dropped' -> emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}); _ -> ok end - end, Topics). + end, maps:to_list(list())). disable() -> - Topics = emqx:get_config([event_message, topics], []), - lists:foreach(fun(Topic) -> + lists:foreach(fun({_Topic, false}) -> ok; + ({Topic, true}) -> case Topic of - <<"$event/client_connected">> -> + '$event/client_connected' -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}); - <<"$event/client_disconnected">> -> + '$event/client_disconnected' -> emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}); - <<"$event/session_subscribed">> -> - emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}); - <<"$event/session_unsubscribed">> -> - emqx_hooks:del('session.unsubscribed', {?MODULE, on_session_unsubscribed}); - <<"$event/message_delivered">> -> + '$event/client_subscribed' -> + emqx_hooks:del('session.subscribed', {?MODULE, on_client_subscribed}); + '$event/client_unsubscribed' -> + emqx_hooks:del('session.unsubscribed', {?MODULE, on_client_unsubscribed}); + '$event/message_delivered' -> emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}); - <<"$event/message_acked">> -> + '$event/message_acked' -> emqx_hooks:del('message.acked', {?MODULE, on_message_acked}); - <<"$event/message_dropped">> -> + '$event/message_dropped' -> emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}); _ -> ok end - end, ?BASE_TOPICS -- Topics). + end, maps:to_list(list())). %%-------------------------------------------------------------------- %% Callbacks @@ -90,7 +100,6 @@ disable() -> on_client_connected(ClientInfo, ConnInfo) -> Payload0 = common_infos(ClientInfo, ConnInfo), Payload = Payload0#{ - connack => 0, %% XXX: connack will be removed in 5.0 keepalive => maps:get(keepalive, ConnInfo, 0), clean_start => maps:get(clean_start, ConnInfo, true), expiry_interval => maps:get(expiry_interval, ConnInfo, 0), @@ -108,8 +117,8 @@ on_client_disconnected(ClientInfo, }, publish_event_msg(<<"$event/client_disconnected">>, Payload). -on_session_subscribed(_ClientInfo = #{clientid := ClientId, - username := Username}, +on_client_subscribed(_ClientInfo = #{clientid := ClientId, + username := Username}, Topic, SubOpts) -> Payload = #{clientid => ClientId, username => Username, @@ -117,17 +126,17 @@ on_session_subscribed(_ClientInfo = #{clientid := ClientId, subopts => SubOpts, ts => erlang:system_time(millisecond) }, - publish_event_msg(<<"$event/session_subscribed">>, Payload). + publish_event_msg(<<"$event/client_subscribed">>, Payload). -on_session_unsubscribed(_ClientInfo = #{clientid := ClientId, - username := Username}, +on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, + username := Username}, Topic, _SubOpts) -> Payload = #{clientid => ClientId, username => Username, topic => Topic, ts => erlang:system_time(millisecond) }, - publish_event_msg(<<"$event/session_unsubscribed">>, Payload). + publish_event_msg(<<"$event/client_unsubscribed">>, Payload). on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> case ignore_sys_message(Message) of @@ -201,8 +210,7 @@ common_infos( ipaddress => ntoa(PeerHost), sockport => SockPort, proto_name => ProtoName, - proto_ver => ProtoVer, - ts => erlang:system_time(millisecond) + proto_ver => ProtoVer }. make_msg(Topic, Payload) -> diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl new file mode 100644 index 000000000..86c3255e1 --- /dev/null +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_event_message_api). + +-behaviour(minirest_api). + +-export([api_spec/0]). + +-export([event_message/2]). + + +api_spec() -> + {[event_message_api()], [event_message_schema()]}. + +event_message_schema() -> + #{ + type => object, + properties => #{ + '$event/client_connected' => #{ + type => boolean, + description => <<"Client connected event">>, + example => get_raw(<<"$event/client_connected">>) + }, + '$event/client_disconnected' => #{ + type => boolean, + description => <<"client_disconnected">>, + example => get_raw(<<"Client disconnected event">>) + }, + '$event/client_subscribed' => #{ + type => boolean, + description => <<"client_subscribed">>, + example => get_raw(<<"Client subscribed event">>) + }, + '$event/client_unsubscribed' => #{ + type => boolean, + description => <<"client_unsubscribed">>, + example => get_raw(<<"Client unsubscribed event">>) + }, + '$event/message_delivered' => #{ + type => boolean, + description => <<"message_delivered">>, + example => get_raw(<<"Message delivered event">>) + }, + '$event/message_acked' => #{ + type => boolean, + description => <<"message_acked">>, + example => get_raw(<<"Message acked event">>) + }, + '$event/message_dropped' => #{ + type => boolean, + description => <<"message_dropped">>, + example => get_raw(<<"Message dropped event">>) + } + } + }. + +event_message_api() -> + Path = "/mqtt/event_message", + Metadata = #{ + get => #{ + description => <<"Event Message">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:response_schema(<<>>, event_message_schema())}}, + post => #{ + description => <<"">>, + 'requestBody' => emqx_mgmt_util:request_body_schema(event_message_schema()), + responses => #{ + <<"200">> => + emqx_mgmt_util:response_schema(<<>>, event_message_schema()) + } + } + }, + {Path, Metadata, event_message}. + +event_message(get, _Request) -> + {200, emqx_event_message:list()}; + +event_message(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + _ = emqx_event_message:update(Params), + {200, emqx_event_message:list()}. + +get_raw(Key) -> + emqx_config:get_raw([<<"event_message">>] ++ [Key], false). diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 0097fdbbe..0c5e716bf 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -45,8 +45,15 @@ fields("rewrite") -> [ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} ]; + fields("event_message") -> - [ {topics, fun topics/1} + [ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)} + , {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)} + , {"$event/client_subscribed", emqx_schema:t(boolean(), undefined, false)} + , {"$event/client_unsubscribed", emqx_schema:t(boolean(), undefined, false)} + , {"$event/message_delivered", emqx_schema:t(boolean(), undefined, false)} + , {"$event/message_acked", emqx_schema:t(boolean(), undefined, false)} + , {"$event/message_dropped", emqx_schema:t(boolean(), undefined, false)} ]; fields("topic_metrics") -> @@ -60,19 +67,3 @@ fields("rules") -> , {dest_topic, emqx_schema:t(binary())} ]. -topics(type) -> hoconsc:array(binary()); -topics(default) -> []; -% topics(validator) -> [ -% fun(Conf) -> -% case lists:member(Conf, ["$event/client_connected", -% "$event/client_disconnected", -% "$event/session_subscribed", -% "$event/session_unsubscribed", -% "$event/message_delivered", -% "$event/message_acked", -% "$event/message_dropped"]) of -% true -> ok; -% false -> {error, "Bad event topic"} -% end -% end]; -topics(_) -> undefined. diff --git a/apps/emqx_modules/test/emqx_event_message_SUITE.erl b/apps/emqx_modules/test/emqx_event_message_SUITE.erl index 97235ac1f..526113bcc 100644 --- a/apps/emqx_modules/test/emqx_event_message_SUITE.erl +++ b/apps/emqx_modules/test/emqx_event_message_SUITE.erl @@ -24,16 +24,14 @@ -define(EVENT_MESSAGE, <<""" event_message: { - topics : [ - \"$event/client_connected\", - \"$event/client_disconnected\", - \"$event/session_subscribed\", - \"$event/session_unsubscribed\", - \"$event/message_delivered\", - \"$event/message_acked\", - \"$event/message_dropped\" - ]}""">>). - + \"$event/client_connected\": true + \"$event/client_disconnected\": true + \"$event/client_subscribed\": true + \"$event/client_unsubscribed\": true + \"$event/message_delivered\": true + \"$event/message_acked\": true + \"$event/message_dropped\": true + }""">>). all() -> emqx_ct:all(?MODULE). @@ -56,12 +54,12 @@ t_event_topic(_) -> {ok, _} = emqtt:connect(C2), ok = recv_connected(<<"clientid">>), - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_subscribed">>, qos1), + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1), _ = receive_publish(100), timer:sleep(50), {ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1), ok = recv_subscribed(<<"clientid">>), - emqtt:unsubscribe(C1, <<"$event/session_subscribed">>), + emqtt:unsubscribe(C1, <<"$event/client_subscribed">>), timer:sleep(50), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1), @@ -77,7 +75,7 @@ t_event_topic(_) -> ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>), recv_message_dropped(<<"clientid">>), - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_unsubscribed">>, qos1), + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1), _ = emqtt:unsubscribe(C2, <<"test_sub">>), ok = recv_unsubscribed(<<"clientid">>), @@ -101,12 +99,14 @@ recv_connected(ClientId) -> <<"ipaddress">> := <<"127.0.0.1">>, <<"proto_name">> := <<"MQTT">>, <<"proto_ver">> := ?MQTT_PROTO_V4, - <<"connack">> := ?RC_SUCCESS, - <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])). + <<"clean_start">> := true, + <<"expiry_interval">> := 0, + <<"keepalive">> := 60 + }, emqx_json:decode(Payload, [return_maps])). recv_subscribed(_ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/session_subscribed">>, Topic). + ?assertMatch(<<"$event/client_subscribed">>, Topic). recv_message_dropped(_ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), @@ -123,7 +123,7 @@ recv_message_acked(_ClientId) -> recv_unsubscribed(_ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/session_unsubscribed">>, Topic). + ?assertMatch(<<"$event/client_unsubscribed">>, Topic). recv_disconnected(ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),