refactor: remove event_messages mod

This commit is contained in:
JianBo He 2022-03-25 10:20:11 +08:00
parent 68961c7479
commit ec0c698914
5 changed files with 0 additions and 633 deletions

View File

@ -13,16 +13,6 @@ telemetry {
enable = true enable = true
} }
event_message {
client_connected = true
client_disconnected = true
# client_subscribed = false
# client_unsubscribed = false
# message_delivered = false
# message_acked = false
# message_dropped = false
}
topic_metrics: [ topic_metrics: [
#{topic: "test/1"} #{topic: "test/1"}
] ]

View File

@ -1,320 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_message).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_modules.hrl").
-export([
list/0,
update/1,
enable/0,
disable/0,
post_config_update/5,
init_conf_handler/0
]).
-export([
on_client_connected/2,
on_client_disconnected/3,
on_client_subscribed/3,
on_client_unsubscribed/3,
on_message_dropped/3,
on_message_delivered/2,
on_message_acked/2
]).
-ifdef(TEST).
-export([reason/1]).
-endif.
init_conf_handler() ->
emqx_conf:add_handler([event_message], ?MODULE).
list() ->
emqx_conf:get([event_message], #{}).
update(Params) ->
case
emqx_conf:update(
[event_message],
Params,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewEventMessage}} ->
{ok, NewEventMessage};
{error, Reason} ->
{error, Reason}
end.
post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) ->
disable(),
enable(maps:to_list(NewConf)).
enable() ->
enable(maps:to_list(list())).
disable() ->
foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())).
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
on_client_connected(ClientInfo, ConnInfo) ->
Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{
keepalive => maps:get(keepalive, ConnInfo, 0),
clean_start => maps:get(clean_start, ConnInfo, true),
expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
},
publish_event_msg(<<"$event/client_connected">>, Payload).
on_client_disconnected(
ClientInfo,
Reason,
ConnInfo = #{disconnected_at := DisconnectedAt}
) ->
Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{
reason => reason(Reason),
disconnected_at => DisconnectedAt
},
publish_event_msg(<<"$event/client_disconnected">>, Payload).
on_client_subscribed(
_ClientInfo = #{
clientid := ClientId,
username := Username
},
Topic,
SubOpts
) ->
Payload = #{
clientid => ClientId,
username => Username,
topic => Topic,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/client_subscribed">>, Payload).
on_client_unsubscribed(
_ClientInfo = #{
clientid := ClientId,
username := Username
},
Topic,
_SubOpts
) ->
Payload = #{
clientid => ClientId,
username => Username,
topic => Topic,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
Payload0 = base_message(Message),
Payload = Payload0#{
reason => Reason,
clientid => ClientId,
username => emqx_message:get_header(username, Message, undefined),
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
},
publish_event_msg(<<"$event/message_dropped">>, Payload)
end,
{ok, Message}.
on_message_delivered(
_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
username := ReceiverUsername
},
#message{from = ClientId} = Message
) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
Payload0 = base_message(Message),
Payload = Payload0#{
from_clientid => ClientId,
from_username => emqx_message:get_header(username, Message, undefined),
clientid => ReceiverCId,
username => ReceiverUsername,
peerhost => ntoa(PeerHost)
},
publish_event_msg(<<"$event/message_delivered">>, Payload)
end,
{ok, Message}.
on_message_acked(
_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
username := ReceiverUsername
},
#message{from = ClientId} = Message
) ->
case ignore_sys_message(Message) of
true ->
ok;
false ->
Payload0 = base_message(Message),
Payload = Payload0#{
from_clientid => ClientId,
from_username => emqx_message:get_header(username, Message, undefined),
clientid => ReceiverCId,
username => ReceiverUsername,
peerhost => ntoa(PeerHost)
},
publish_event_msg(<<"$event/message_acked">>, Payload)
end,
{ok, Message}.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
common_infos(
_ClientInfo = #{
clientid := ClientId,
username := Username,
peerhost := PeerHost,
sockport := SockPort
},
_ConnInfo = #{
proto_name := ProtoName,
proto_ver := ProtoVer,
connected_at := ConnectedAt
}
) ->
#{
clientid => ClientId,
username => Username,
ipaddress => ntoa(PeerHost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => ProtoVer,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.
make_msg(Topic, Payload) ->
emqx_message:set_flag(
sys,
emqx_message:make(
?MODULE, 0, Topic, iolist_to_binary(Payload)
)
).
-compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.
ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
printable_maps(undefined) ->
#{};
printable_maps(Headers) ->
maps:fold(
fun
(K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
AccIn#{K => ntoa(V0)};
('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [
#{
key => Key,
value => Value
}
|| {Key, Value} <- V0
]
};
(K, V0, AccIn) ->
AccIn#{K => V0}
end,
#{},
Headers
).
base_message(Message) ->
#message{
id = Id,
qos = QoS,
flags = Flags,
topic = Topic,
headers = Headers,
payload = Payload,
timestamp = Timestamp
} = Message,
#{
id => emqx_guid:to_hexstr(Id),
payload => Payload,
topic => Topic,
qos => QoS,
flags => Flags,
headers => printable_maps(Headers),
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}.
ignore_sys_message(#message{flags = Flags}) ->
maps:get(sys, Flags, false).
publish_event_msg(Topic, Payload) ->
_ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
ok.
enable(List) ->
foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List).
check_enable(Handler, {client_connected, true}) ->
Handler('client.connected', {?MODULE, on_client_connected, []});
check_enable(Handler, {client_disconnected, true}) ->
Handler('client.disconnected', {?MODULE, on_client_disconnected, []});
check_enable(Handler, {client_subscribed, true}) ->
Handler('session.subscribed', {?MODULE, on_client_subscribed, []});
check_enable(Handler, {client_unsubscribed, true}) ->
Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
check_enable(Handler, {message_delivered, true}) ->
Handler('message.delivered', {?MODULE, on_message_delivered, []});
check_enable(Handler, {message_acked, true}) ->
Handler('message.acked', {?MODULE, on_message_acked, []});
check_enable(Handler, {message_dropped, true}) ->
Handler('message.dropped', {?MODULE, on_message_dropped, []});
check_enable(_Handler, {_Topic, _Enable}) ->
ok.
foreach_with(Fun, With, [H | T]) ->
Fun(With, H),
foreach_with(Fun, With, T);
foreach_with(_Fun, _With, []) ->
ok.

View File

@ -1,70 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_message_api).
-include("emqx_modules.hrl").
-behaviour(minirest_api).
-import(hoconsc, [mk/2, ref/2]).
-export([
api_spec/0,
paths/0,
schema/1
]).
-export([event_message/2]).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
["/mqtt/event_message"].
schema("/mqtt/event_message") ->
#{
'operationId' => event_message,
get =>
#{
description => <<"Event Message">>,
tags => ?API_TAG_MQTT,
responses =>
#{200 => status_schema(<<"Get Event Message config successfully">>)}
},
put =>
#{
description => <<"Update Event Message">>,
tags => ?API_TAG_MQTT,
'requestBody' => status_schema(<<"Update Event Message config">>),
responses =>
#{200 => status_schema(<<"Update Event Message config successfully">>)}
}
}.
status_schema(Desc) ->
mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}).
event_message(get, _Params) ->
{200, emqx_event_message:list()};
event_message(put, #{body := Body}) ->
case emqx_event_message:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{500, 'INTERNAL_ERROR', Message}
end.

View File

@ -32,7 +32,6 @@ roots() ->
[ [
"delayed", "delayed",
"telemetry", "telemetry",
"event_message",
array("rewrite"), array("rewrite"),
array("topic_metrics") array("topic_metrics")
]. ].
@ -63,76 +62,6 @@ fields("rewrite") ->
)}, )},
{re, fun regular_expression/1} {re, fun regular_expression/1}
]; ];
fields("event_message") ->
Fields =
[
{client_connected,
sc(
boolean(),
#{
desc => <<"Enable/disable client_connected event messages">>,
default => false
}
)},
{client_disconnected,
sc(
boolean(),
#{
desc => <<"Enable/disable client_disconnected event messages">>,
default => false
}
)},
{client_subscribed,
sc(
boolean(),
#{
desc => <<"Enable/disable client_subscribed event messages">>,
default => false
}
)},
{client_unsubscribed,
sc(
boolean(),
#{
desc => <<"Enable/disable client_unsubscribed event messages">>,
default => false
}
)},
{message_delivered,
sc(
boolean(),
#{
desc => <<"Enable/disable message_delivered event messages">>,
default => false
}
)},
{message_acked,
sc(
boolean(),
#{
desc => <<"Enable/disable message_acked event messages">>,
default => false
}
)},
{message_dropped,
sc(
boolean(),
#{
desc => <<"Enable/disable message_dropped event messages">>,
default => false
}
)}
],
#{
fields => Fields,
desc =>
"Enable/Disable system event messages.\n"
"The messages are published to <code>$event</code> prefixed topics.\n"
"For example, if `client_disconnected` is set to `true`,\n"
"a message is published to <code>$event/client_connected</code> topic\n"
"whenever a client is connected.\n"
""
};
fields("topic_metrics") -> fields("topic_metrics") ->
[{topic, sc(binary(), #{})}]. [{topic, sc(binary(), #{})}].

View File

@ -1,162 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_message_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(EVENT_MESSAGE, <<
""
"\n"
"event_message: {\n"
" client_connected: true\n"
" client_disconnected: true\n"
" client_subscribed: true\n"
" client_unsubscribed: true\n"
" message_delivered: true\n"
" message_acked: true\n"
" message_dropped: true\n"
"}"
""
>>).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([emqx_modules]),
load_config(),
Config.
load_config() ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE).
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_modules]).
t_event_topic(_) ->
ok = emqx_event_message:enable(),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
{ok, C2} = emqtt:start_link([
{clientid, <<"clientid">>},
{username, <<"username">>}
]),
{ok, _} = emqtt:connect(C2),
ok = recv_connected(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1),
_ = receive_publish(100),
timer:sleep(50),
{ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
ok = recv_subscribed(<<"clientid">>),
emqtt:unsubscribe(C1, <<"$event/client_subscribed">>),
timer:sleep(50),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
_ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
{ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100),
{ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100),
recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1),
recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2),
recv_message_acked(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
recv_message_dropped(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1),
_ = emqtt:unsubscribe(C2, <<"test_sub">>),
ok = recv_unsubscribed(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1),
ok = emqtt:disconnect(C2),
ok = recv_disconnected(<<"clientid">>),
ok = emqtt:disconnect(C1),
ok = emqx_event_message:disable().
t_reason(_) ->
?assertEqual(normal, emqx_event_message:reason(normal)),
?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)).
recv_connected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch(<<"$event/client_connected">>, Topic),
?assertMatch(
#{
<<"clientid">> := ClientId,
<<"username">> := <<"username">>,
<<"ipaddress">> := <<"127.0.0.1">>,
<<"proto_name">> := <<"MQTT">>,
<<"proto_ver">> := ?MQTT_PROTO_V4,
<<"clean_start">> := true,
<<"expiry_interval">> := 0,
<<"keepalive">> := 60
},
emqx_json:decode(Payload, [return_maps])
).
recv_subscribed(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/client_subscribed">>, Topic).
recv_message_dropped(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/message_dropped">>, Topic).
recv_message_publish_or_delivered(_ClientId, 0, Topic) ->
?assertMatch(<<"$event/message_delivered">>, Topic);
recv_message_publish_or_delivered(_ClientId, 1, Topic) ->
?assertMatch(<<"test_sub">>, Topic).
recv_message_acked(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/message_acked">>, Topic).
recv_unsubscribed(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
?assertMatch(<<"$event/client_unsubscribed">>, Topic).
recv_disconnected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch(<<"$event/client_disconnected">>, Topic),
?assertMatch(
#{
<<"clientid">> := ClientId,
<<"username">> := <<"username">>,
<<"reason">> := <<"normal">>
},
emqx_json:decode(Payload, [return_maps])
).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} ->
{ok, Publish}
after Timeout -> {error, timeout}
end.