refactor(event-message): refactor event_message

* refactor(event-message): refactor event_message configuration

* feat(event-message): add event_message REST API
This commit is contained in:
turtleDeng 2021-08-18 16:26:15 +08:00 committed by GitHub
parent e8e95d39ef
commit f01b77e4fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 176 additions and 89 deletions

View File

@ -11,17 +11,14 @@ telemetry: {
enable: true
}
event_message: {
topics: [
"$event/client_connected",
"$event/client_disconnected",
"$event/session_subscribed",
"$event/session_unsubscribed",
"$event/message_delivered",
"$event/message_acked",
"$event/message_dropped"
]
event_message {
"$event/client_connected": true
"$event/client_disconnected": true
# "$event/client_subscribed": false
# "$event/client_unsubscribed": false
# "$event/message_delivered": false
# "$event/message_acked": false
# "$event/message_dropped": false
}
topic_metrics:{

View File

@ -3,11 +3,3 @@
%% Interval for reporting telemetry data, Default: 7d
-define(REPORT_INTERVAR, 604800).
-define(BASE_TOPICS, [<<"$event/client_connected">>,
<<"$event/client_disconnected">>,
<<"$event/session_subscribed">>,
<<"$event/session_unsubscribed">>,
<<"$event/message_delivered">>,
<<"$event/message_acked">>,
<<"$event/message_dropped">>]).

View File

@ -20,14 +20,16 @@
-include_lib("emqx/include/logger.hrl").
-include("emqx_modules.hrl").
-export([ enable/0
-export([ list/0
, update/1
, enable/0
, disable/0
]).
-export([ on_client_connected/2
, on_client_disconnected/3
, on_session_subscribed/3
, on_session_unsubscribed/3
, on_client_subscribed/3
, on_client_unsubscribed/3
, on_message_dropped/3
, on_message_delivered/2
, on_message_acked/2
@ -37,51 +39,59 @@
-export([reason/1]).
-endif.
list() ->
emqx:get_config([event_message], #{}).
update(Params) ->
disable(),
{ok, _} = emqx:update_config([event_message], Params),
enable().
enable() ->
Topics = emqx:get_config([event_message, topics], []),
lists:foreach(fun(Topic) ->
lists:foreach(fun({_Topic, false}) -> ok;
({Topic, true}) ->
case Topic of
<<"$event/client_connected">> ->
'$event/client_connected' ->
emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []});
<<"$event/client_disconnected">> ->
'$event/client_disconnected' ->
emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []});
<<"$event/session_subscribed">> ->
emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []});
<<"$event/session_unsubscribed">> ->
emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []});
<<"$event/message_delivered">> ->
'$event/client_subscribed' ->
emqx_hooks:put('session.subscribed', {?MODULE, on_client_subscribed, []});
'$event/client_unsubscribed' ->
emqx_hooks:put('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
'$event/message_delivered' ->
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []});
<<"$event/message_acked">> ->
'$event/message_acked' ->
emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []});
<<"$event/message_dropped">> ->
'$event/message_dropped' ->
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []});
_ ->
ok
end
end, Topics).
end, maps:to_list(list())).
disable() ->
Topics = emqx:get_config([event_message, topics], []),
lists:foreach(fun(Topic) ->
lists:foreach(fun({_Topic, false}) -> ok;
({Topic, true}) ->
case Topic of
<<"$event/client_connected">> ->
'$event/client_connected' ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected});
<<"$event/client_disconnected">> ->
'$event/client_disconnected' ->
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected});
<<"$event/session_subscribed">> ->
emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed});
<<"$event/session_unsubscribed">> ->
emqx_hooks:del('session.unsubscribed', {?MODULE, on_session_unsubscribed});
<<"$event/message_delivered">> ->
'$event/client_subscribed' ->
emqx_hooks:del('session.subscribed', {?MODULE, on_client_subscribed});
'$event/client_unsubscribed' ->
emqx_hooks:del('session.unsubscribed', {?MODULE, on_client_unsubscribed});
'$event/message_delivered' ->
emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered});
<<"$event/message_acked">> ->
'$event/message_acked' ->
emqx_hooks:del('message.acked', {?MODULE, on_message_acked});
<<"$event/message_dropped">> ->
'$event/message_dropped' ->
emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped});
_ ->
ok
end
end, ?BASE_TOPICS -- Topics).
end, maps:to_list(list())).
%%--------------------------------------------------------------------
%% Callbacks
@ -90,7 +100,6 @@ disable() ->
on_client_connected(ClientInfo, ConnInfo) ->
Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{
connack => 0, %% XXX: connack will be removed in 5.0
keepalive => maps:get(keepalive, ConnInfo, 0),
clean_start => maps:get(clean_start, ConnInfo, true),
expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
@ -108,7 +117,7 @@ on_client_disconnected(ClientInfo,
},
publish_event_msg(<<"$event/client_disconnected">>, Payload).
on_session_subscribed(_ClientInfo = #{clientid := ClientId,
on_client_subscribed(_ClientInfo = #{clientid := ClientId,
username := Username},
Topic, SubOpts) ->
Payload = #{clientid => ClientId,
@ -117,9 +126,9 @@ on_session_subscribed(_ClientInfo = #{clientid := ClientId,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/session_subscribed">>, Payload).
publish_event_msg(<<"$event/client_subscribed">>, Payload).
on_session_unsubscribed(_ClientInfo = #{clientid := ClientId,
on_client_unsubscribed(_ClientInfo = #{clientid := ClientId,
username := Username},
Topic, _SubOpts) ->
Payload = #{clientid => ClientId,
@ -127,7 +136,7 @@ on_session_unsubscribed(_ClientInfo = #{clientid := ClientId,
topic => Topic,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/session_unsubscribed">>, Payload).
publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
case ignore_sys_message(Message) of
@ -201,8 +210,7 @@ common_infos(
ipaddress => ntoa(PeerHost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => ProtoVer,
ts => erlang:system_time(millisecond)
proto_ver => ProtoVer
}.
make_msg(Topic, Payload) ->

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_message_api).
-behaviour(minirest_api).
-export([api_spec/0]).
-export([event_message/2]).
api_spec() ->
{[event_message_api()], [event_message_schema()]}.
event_message_schema() ->
#{
type => object,
properties => #{
'$event/client_connected' => #{
type => boolean,
description => <<"Client connected event">>,
example => get_raw(<<"$event/client_connected">>)
},
'$event/client_disconnected' => #{
type => boolean,
description => <<"client_disconnected">>,
example => get_raw(<<"Client disconnected event">>)
},
'$event/client_subscribed' => #{
type => boolean,
description => <<"client_subscribed">>,
example => get_raw(<<"Client subscribed event">>)
},
'$event/client_unsubscribed' => #{
type => boolean,
description => <<"client_unsubscribed">>,
example => get_raw(<<"Client unsubscribed event">>)
},
'$event/message_delivered' => #{
type => boolean,
description => <<"message_delivered">>,
example => get_raw(<<"Message delivered event">>)
},
'$event/message_acked' => #{
type => boolean,
description => <<"message_acked">>,
example => get_raw(<<"Message acked event">>)
},
'$event/message_dropped' => #{
type => boolean,
description => <<"message_dropped">>,
example => get_raw(<<"Message dropped event">>)
}
}
}.
event_message_api() ->
Path = "/mqtt/event_message",
Metadata = #{
get => #{
description => <<"Event Message">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<>>, event_message_schema())}},
post => #{
description => <<"">>,
'requestBody' => emqx_mgmt_util:request_body_schema(event_message_schema()),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<>>, event_message_schema())
}
}
},
{Path, Metadata, event_message}.
event_message(get, _Request) ->
{200, emqx_event_message:list()};
event_message(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
_ = emqx_event_message:update(Params),
{200, emqx_event_message:list()}.
get_raw(Key) ->
emqx_config:get_raw([<<"event_message">>] ++ [Key], false).

View File

@ -45,8 +45,15 @@ fields("rewrite") ->
[ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))}
];
fields("event_message") ->
[ {topics, fun topics/1}
[ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_subscribed", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_unsubscribed", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_delivered", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_acked", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_dropped", emqx_schema:t(boolean(), undefined, false)}
];
fields("topic_metrics") ->
@ -60,19 +67,3 @@ fields("rules") ->
, {dest_topic, emqx_schema:t(binary())}
].
topics(type) -> hoconsc:array(binary());
topics(default) -> [];
% topics(validator) -> [
% fun(Conf) ->
% case lists:member(Conf, ["$event/client_connected",
% "$event/client_disconnected",
% "$event/session_subscribed",
% "$event/session_unsubscribed",
% "$event/message_delivered",
% "$event/message_acked",
% "$event/message_dropped"]) of
% true -> ok;
% false -> {error, "Bad event topic"}
% end
% end];
topics(_) -> undefined.

View File

@ -24,16 +24,14 @@
-define(EVENT_MESSAGE, <<"""
event_message: {
topics : [
\"$event/client_connected\",
\"$event/client_disconnected\",
\"$event/session_subscribed\",
\"$event/session_unsubscribed\",
\"$event/message_delivered\",
\"$event/message_acked\",
\"$event/message_dropped\"
]}""">>).
\"$event/client_connected\": true
\"$event/client_disconnected\": true
\"$event/client_subscribed\": true
\"$event/client_unsubscribed\": true
\"$event/message_delivered\": true
\"$event/message_acked\": true
\"$event/message_dropped\": true
}""">>).
all() -> emqx_ct:all(?MODULE).
@ -56,12 +54,12 @@ t_event_topic(_) ->
{ok, _} = emqtt:connect(C2),
ok = recv_connected(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_subscribed">>, qos1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1),
_ = receive_publish(100),
timer:sleep(50),
{ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
ok = recv_subscribed(<<"clientid">>),
emqtt:unsubscribe(C1, <<"$event/session_subscribed">>),
emqtt:unsubscribe(C1, <<"$event/client_subscribed">>),
timer:sleep(50),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
@ -77,7 +75,7 @@ t_event_topic(_) ->
ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
recv_message_dropped(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_unsubscribed">>, qos1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1),
_ = emqtt:unsubscribe(C2, <<"test_sub">>),
ok = recv_unsubscribed(<<"clientid">>),
@ -101,12 +99,14 @@ recv_connected(ClientId) ->
<<"ipaddress">> := <<"127.0.0.1">>,
<<"proto_name">> := <<"MQTT">>,
<<"proto_ver">> := ?MQTT_PROTO_V4,
<<"connack">> := ?RC_SUCCESS,
<<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])).
<<"clean_start">> := true,
<<"expiry_interval">> := 0,
<<"keepalive">> := 60
}, emqx_json:decode(Payload, [return_maps])).
recv_subscribed(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/session_subscribed">>, Topic).
?assertMatch(<<"$event/client_subscribed">>, Topic).
recv_message_dropped(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
@ -123,7 +123,7 @@ recv_message_acked(_ClientId) ->
recv_unsubscribed(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/session_unsubscribed">>, Topic).
?assertMatch(<<"$event/client_unsubscribed">>, Topic).
recv_disconnected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),