From ec0c698914f7dd3b1418994b77f98437f065e65d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 25 Mar 2022 10:20:11 +0800 Subject: [PATCH 1/2] refactor: remove event_messages mod --- apps/emqx_modules/etc/emqx_modules.conf | 10 - apps/emqx_modules/src/emqx_event_message.erl | 320 ------------------ .../src/emqx_event_message_api.erl | 70 ---- apps/emqx_modules/src/emqx_modules_schema.erl | 71 ---- .../test/emqx_event_message_SUITE.erl | 162 --------- 5 files changed, 633 deletions(-) delete mode 100644 apps/emqx_modules/src/emqx_event_message.erl delete mode 100644 apps/emqx_modules/src/emqx_event_message_api.erl delete mode 100644 apps/emqx_modules/test/emqx_event_message_SUITE.erl diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index f07e740da..45549edec 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -13,16 +13,6 @@ telemetry { enable = true } -event_message { - client_connected = true - client_disconnected = true - # client_subscribed = false - # client_unsubscribed = false - # message_delivered = false - # message_acked = false - # message_dropped = false -} - topic_metrics: [ #{topic: "test/1"} ] diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl deleted file mode 100644 index c462e7e25..000000000 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ /dev/null @@ -1,320 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_event_message). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). --include("emqx_modules.hrl"). - --export([ - list/0, - update/1, - enable/0, - disable/0, - post_config_update/5, - init_conf_handler/0 -]). - --export([ - on_client_connected/2, - on_client_disconnected/3, - on_client_subscribed/3, - on_client_unsubscribed/3, - on_message_dropped/3, - on_message_delivered/2, - on_message_acked/2 -]). - --ifdef(TEST). --export([reason/1]). --endif. - -init_conf_handler() -> - emqx_conf:add_handler([event_message], ?MODULE). - -list() -> - emqx_conf:get([event_message], #{}). - -update(Params) -> - case - emqx_conf:update( - [event_message], - Params, - #{rawconf_with_defaults => true, override_to => cluster} - ) - of - {ok, #{raw_config := NewEventMessage}} -> - {ok, NewEventMessage}; - {error, Reason} -> - {error, Reason} - end. - -post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) -> - disable(), - enable(maps:to_list(NewConf)). - -enable() -> - enable(maps:to_list(list())). - -disable() -> - foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())). - -%%-------------------------------------------------------------------- -%% Callbacks -%%-------------------------------------------------------------------- - -on_client_connected(ClientInfo, ConnInfo) -> - Payload0 = common_infos(ClientInfo, ConnInfo), - Payload = Payload0#{ - keepalive => maps:get(keepalive, ConnInfo, 0), - clean_start => maps:get(clean_start, ConnInfo, true), - expiry_interval => maps:get(expiry_interval, ConnInfo, 0) - }, - publish_event_msg(<<"$event/client_connected">>, Payload). - -on_client_disconnected( - ClientInfo, - Reason, - ConnInfo = #{disconnected_at := DisconnectedAt} -) -> - Payload0 = common_infos(ClientInfo, ConnInfo), - Payload = Payload0#{ - reason => reason(Reason), - disconnected_at => DisconnectedAt - }, - publish_event_msg(<<"$event/client_disconnected">>, Payload). - -on_client_subscribed( - _ClientInfo = #{ - clientid := ClientId, - username := Username - }, - Topic, - SubOpts -) -> - Payload = #{ - clientid => ClientId, - username => Username, - topic => Topic, - subopts => SubOpts, - ts => erlang:system_time(millisecond) - }, - publish_event_msg(<<"$event/client_subscribed">>, Payload). - -on_client_unsubscribed( - _ClientInfo = #{ - clientid := ClientId, - username := Username - }, - Topic, - _SubOpts -) -> - Payload = #{ - clientid => ClientId, - username => Username, - topic => Topic, - ts => erlang:system_time(millisecond) - }, - publish_event_msg(<<"$event/client_unsubscribed">>, Payload). - -on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> - case ignore_sys_message(Message) of - true -> - ok; - false -> - Payload0 = base_message(Message), - Payload = Payload0#{ - reason => Reason, - clientid => ClientId, - username => emqx_message:get_header(username, Message, undefined), - peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)) - }, - publish_event_msg(<<"$event/message_dropped">>, Payload) - end, - {ok, Message}. - -on_message_delivered( - _ClientInfo = #{ - peerhost := PeerHost, - clientid := ReceiverCId, - username := ReceiverUsername - }, - #message{from = ClientId} = Message -) -> - case ignore_sys_message(Message) of - true -> - ok; - false -> - Payload0 = base_message(Message), - Payload = Payload0#{ - from_clientid => ClientId, - from_username => emqx_message:get_header(username, Message, undefined), - clientid => ReceiverCId, - username => ReceiverUsername, - peerhost => ntoa(PeerHost) - }, - publish_event_msg(<<"$event/message_delivered">>, Payload) - end, - {ok, Message}. - -on_message_acked( - _ClientInfo = #{ - peerhost := PeerHost, - clientid := ReceiverCId, - username := ReceiverUsername - }, - #message{from = ClientId} = Message -) -> - case ignore_sys_message(Message) of - true -> - ok; - false -> - Payload0 = base_message(Message), - Payload = Payload0#{ - from_clientid => ClientId, - from_username => emqx_message:get_header(username, Message, undefined), - clientid => ReceiverCId, - username => ReceiverUsername, - peerhost => ntoa(PeerHost) - }, - publish_event_msg(<<"$event/message_acked">>, Payload) - end, - {ok, Message}. - -%%-------------------------------------------------------------------- -%% Helper functions -%%-------------------------------------------------------------------- -common_infos( - _ClientInfo = #{ - clientid := ClientId, - username := Username, - peerhost := PeerHost, - sockport := SockPort - }, - _ConnInfo = #{ - proto_name := ProtoName, - proto_ver := ProtoVer, - connected_at := ConnectedAt - } -) -> - #{ - clientid => ClientId, - username => Username, - ipaddress => ntoa(PeerHost), - sockport => SockPort, - proto_name => ProtoName, - proto_ver => ProtoVer, - connected_at => ConnectedAt, - ts => erlang:system_time(millisecond) - }. - -make_msg(Topic, Payload) -> - emqx_message:set_flag( - sys, - emqx_message:make( - ?MODULE, 0, Topic, iolist_to_binary(Payload) - ) - ). - --compile({inline, [reason/1]}). -reason(Reason) when is_atom(Reason) -> Reason; -reason({shutdown, Reason}) when is_atom(Reason) -> Reason; -reason({Error, _}) when is_atom(Error) -> Error; -reason(_) -> internal_error. - -ntoa(undefined) -> undefined; -ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); -ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). - -printable_maps(undefined) -> - #{}; -printable_maps(Headers) -> - maps:fold( - fun - (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> - AccIn#{K => ntoa(V0)}; - ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [ - #{ - key => Key, - value => Value - } - || {Key, Value} <- V0 - ] - }; - (K, V0, AccIn) -> - AccIn#{K => V0} - end, - #{}, - Headers - ). - -base_message(Message) -> - #message{ - id = Id, - qos = QoS, - flags = Flags, - topic = Topic, - headers = Headers, - payload = Payload, - timestamp = Timestamp - } = Message, - #{ - id => emqx_guid:to_hexstr(Id), - payload => Payload, - topic => Topic, - qos => QoS, - flags => Flags, - headers => printable_maps(Headers), - pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp - }. - -ignore_sys_message(#message{flags = Flags}) -> - maps:get(sys, Flags, false). - -publish_event_msg(Topic, Payload) -> - _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))), - ok. - -enable(List) -> - foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List). - -check_enable(Handler, {client_connected, true}) -> - Handler('client.connected', {?MODULE, on_client_connected, []}); -check_enable(Handler, {client_disconnected, true}) -> - Handler('client.disconnected', {?MODULE, on_client_disconnected, []}); -check_enable(Handler, {client_subscribed, true}) -> - Handler('session.subscribed', {?MODULE, on_client_subscribed, []}); -check_enable(Handler, {client_unsubscribed, true}) -> - Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []}); -check_enable(Handler, {message_delivered, true}) -> - Handler('message.delivered', {?MODULE, on_message_delivered, []}); -check_enable(Handler, {message_acked, true}) -> - Handler('message.acked', {?MODULE, on_message_acked, []}); -check_enable(Handler, {message_dropped, true}) -> - Handler('message.dropped', {?MODULE, on_message_dropped, []}); -check_enable(_Handler, {_Topic, _Enable}) -> - ok. - -foreach_with(Fun, With, [H | T]) -> - Fun(With, H), - foreach_with(Fun, With, T); -foreach_with(_Fun, _With, []) -> - ok. diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl deleted file mode 100644 index c6c71b804..000000000 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ /dev/null @@ -1,70 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_event_message_api). - --include("emqx_modules.hrl"). - --behaviour(minirest_api). - --import(hoconsc, [mk/2, ref/2]). - --export([ - api_spec/0, - paths/0, - schema/1 -]). - --export([event_message/2]). - -api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). - -paths() -> - ["/mqtt/event_message"]. - -schema("/mqtt/event_message") -> - #{ - 'operationId' => event_message, - get => - #{ - description => <<"Event Message">>, - tags => ?API_TAG_MQTT, - responses => - #{200 => status_schema(<<"Get Event Message config successfully">>)} - }, - put => - #{ - description => <<"Update Event Message">>, - tags => ?API_TAG_MQTT, - 'requestBody' => status_schema(<<"Update Event Message config">>), - responses => - #{200 => status_schema(<<"Update Event Message config successfully">>)} - } - }. - -status_schema(Desc) -> - mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}). - -event_message(get, _Params) -> - {200, emqx_event_message:list()}; -event_message(put, #{body := Body}) -> - case emqx_event_message:update(Body) of - {ok, NewConfig} -> - {200, NewConfig}; - {error, Reason} -> - Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), - {500, 'INTERNAL_ERROR', Message} - end. diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 8400311d5..4b27879e2 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -32,7 +32,6 @@ roots() -> [ "delayed", "telemetry", - "event_message", array("rewrite"), array("topic_metrics") ]. @@ -63,76 +62,6 @@ fields("rewrite") -> )}, {re, fun regular_expression/1} ]; -fields("event_message") -> - Fields = - [ - {client_connected, - sc( - boolean(), - #{ - desc => <<"Enable/disable client_connected event messages">>, - default => false - } - )}, - {client_disconnected, - sc( - boolean(), - #{ - desc => <<"Enable/disable client_disconnected event messages">>, - default => false - } - )}, - {client_subscribed, - sc( - boolean(), - #{ - desc => <<"Enable/disable client_subscribed event messages">>, - default => false - } - )}, - {client_unsubscribed, - sc( - boolean(), - #{ - desc => <<"Enable/disable client_unsubscribed event messages">>, - default => false - } - )}, - {message_delivered, - sc( - boolean(), - #{ - desc => <<"Enable/disable message_delivered event messages">>, - default => false - } - )}, - {message_acked, - sc( - boolean(), - #{ - desc => <<"Enable/disable message_acked event messages">>, - default => false - } - )}, - {message_dropped, - sc( - boolean(), - #{ - desc => <<"Enable/disable message_dropped event messages">>, - default => false - } - )} - ], - #{ - fields => Fields, - desc => - "Enable/Disable system event messages.\n" - "The messages are published to $event prefixed topics.\n" - "For example, if `client_disconnected` is set to `true`,\n" - "a message is published to $event/client_connected topic\n" - "whenever a client is connected.\n" - "" - }; fields("topic_metrics") -> [{topic, sc(binary(), #{})}]. diff --git a/apps/emqx_modules/test/emqx_event_message_SUITE.erl b/apps/emqx_modules/test/emqx_event_message_SUITE.erl deleted file mode 100644 index beb9175fe..000000000 --- a/apps/emqx_modules/test/emqx_event_message_SUITE.erl +++ /dev/null @@ -1,162 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_event_message_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(EVENT_MESSAGE, << - "" - "\n" - "event_message: {\n" - " client_connected: true\n" - " client_disconnected: true\n" - " client_subscribed: true\n" - " client_unsubscribed: true\n" - " message_delivered: true\n" - " message_acked: true\n" - " message_dropped: true\n" - "}" - "" ->>). - -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([emqx_modules]), - load_config(), - Config. - -load_config() -> - ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE). - -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_modules]). - -t_event_topic(_) -> - ok = emqx_event_message:enable(), - {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), - {ok, _} = emqtt:connect(C1), - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1), - {ok, C2} = emqtt:start_link([ - {clientid, <<"clientid">>}, - {username, <<"username">>} - ]), - {ok, _} = emqtt:connect(C2), - ok = recv_connected(<<"clientid">>), - - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1), - _ = receive_publish(100), - timer:sleep(50), - {ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1), - ok = recv_subscribed(<<"clientid">>), - emqtt:unsubscribe(C1, <<"$event/client_subscribed">>), - timer:sleep(50), - - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1), - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1), - _ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)), - {ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100), - {ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100), - recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1), - recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2), - recv_message_acked(<<"clientid">>), - - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1), - ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>), - recv_message_dropped(<<"clientid">>), - - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1), - _ = emqtt:unsubscribe(C2, <<"test_sub">>), - ok = recv_unsubscribed(<<"clientid">>), - - {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1), - ok = emqtt:disconnect(C2), - ok = recv_disconnected(<<"clientid">>), - ok = emqtt:disconnect(C1), - ok = emqx_event_message:disable(). - -t_reason(_) -> - ?assertEqual(normal, emqx_event_message:reason(normal)), - ?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})), - ?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})), - ?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)). - -recv_connected(ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), - ?assertMatch(<<"$event/client_connected">>, Topic), - ?assertMatch( - #{ - <<"clientid">> := ClientId, - <<"username">> := <<"username">>, - <<"ipaddress">> := <<"127.0.0.1">>, - <<"proto_name">> := <<"MQTT">>, - <<"proto_ver">> := ?MQTT_PROTO_V4, - <<"clean_start">> := true, - <<"expiry_interval">> := 0, - <<"keepalive">> := 60 - }, - emqx_json:decode(Payload, [return_maps]) - ). - -recv_subscribed(_ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/client_subscribed">>, Topic). - -recv_message_dropped(_ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/message_dropped">>, Topic). - -recv_message_publish_or_delivered(_ClientId, 0, Topic) -> - ?assertMatch(<<"$event/message_delivered">>, Topic); -recv_message_publish_or_delivered(_ClientId, 1, Topic) -> - ?assertMatch(<<"test_sub">>, Topic). - -recv_message_acked(_ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/message_acked">>, Topic). - -recv_unsubscribed(_ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), - ?assertMatch(<<"$event/client_unsubscribed">>, Topic). - -recv_disconnected(ClientId) -> - {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), - ?assertMatch(<<"$event/client_disconnected">>, Topic), - ?assertMatch( - #{ - <<"clientid">> := ClientId, - <<"username">> := <<"username">>, - <<"reason">> := <<"normal">> - }, - emqx_json:decode(Payload, [return_maps]) - ). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -receive_publish(Timeout) -> - receive - {publish, Publish} -> - {ok, Publish} - after Timeout -> {error, timeout} - end. From 7f1082a0bbaf64cee782dfd743d6286f82da5b3c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 25 Mar 2022 11:20:18 +0800 Subject: [PATCH 2/2] chore: make xref happy --- apps/emqx_modules/src/emqx_modules_app.erl | 2 -- apps/emqx_modules/src/emqx_modules_sup.erl | 1 - 2 files changed, 3 deletions(-) diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index bf8efc8f6..51b314861 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -36,7 +36,6 @@ maybe_enable_modules() -> emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), - emqx_event_message:enable(), emqx_conf_cli:load(), ok = emqx_rewrite:enable(), emqx_topic_metrics:enable(), @@ -46,7 +45,6 @@ maybe_disable_modules() -> emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(), emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), - emqx_event_message:disable(), emqx_rewrite:disable(), emqx_conf_cli:unload(), emqx_topic_metrics:disable(), diff --git a/apps/emqx_modules/src/emqx_modules_sup.erl b/apps/emqx_modules/src/emqx_modules_sup.erl index a0d71bcbc..42b000eaa 100644 --- a/apps/emqx_modules/src/emqx_modules_sup.erl +++ b/apps/emqx_modules/src/emqx_modules_sup.erl @@ -39,7 +39,6 @@ start_link() -> %% Supervisor callbacks %%-------------------------------------------------------------------- init([]) -> - emqx_event_message:init_conf_handler(), {ok, {{one_for_one, 10, 3600}, [ ?CHILD(emqx_telemetry),