Merge pull request #7405 from HJianBo/remove-event-message
refactor: remove event_messages mod
This commit is contained in:
commit
e6cd3ad3ac
|
@ -13,16 +13,6 @@ telemetry {
|
|||
enable = true
|
||||
}
|
||||
|
||||
event_message {
|
||||
client_connected = true
|
||||
client_disconnected = true
|
||||
# client_subscribed = false
|
||||
# client_unsubscribed = false
|
||||
# message_delivered = false
|
||||
# message_acked = false
|
||||
# message_dropped = false
|
||||
}
|
||||
|
||||
topic_metrics: [
|
||||
#{topic: "test/1"}
|
||||
]
|
||||
|
|
|
@ -1,320 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_event_message).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include("emqx_modules.hrl").
|
||||
|
||||
-export([
|
||||
list/0,
|
||||
update/1,
|
||||
enable/0,
|
||||
disable/0,
|
||||
post_config_update/5,
|
||||
init_conf_handler/0
|
||||
]).
|
||||
|
||||
-export([
|
||||
on_client_connected/2,
|
||||
on_client_disconnected/3,
|
||||
on_client_subscribed/3,
|
||||
on_client_unsubscribed/3,
|
||||
on_message_dropped/3,
|
||||
on_message_delivered/2,
|
||||
on_message_acked/2
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([reason/1]).
|
||||
-endif.
|
||||
|
||||
init_conf_handler() ->
|
||||
emqx_conf:add_handler([event_message], ?MODULE).
|
||||
|
||||
list() ->
|
||||
emqx_conf:get([event_message], #{}).
|
||||
|
||||
update(Params) ->
|
||||
case
|
||||
emqx_conf:update(
|
||||
[event_message],
|
||||
Params,
|
||||
#{rawconf_with_defaults => true, override_to => cluster}
|
||||
)
|
||||
of
|
||||
{ok, #{raw_config := NewEventMessage}} ->
|
||||
{ok, NewEventMessage};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) ->
|
||||
disable(),
|
||||
enable(maps:to_list(NewConf)).
|
||||
|
||||
enable() ->
|
||||
enable(maps:to_list(list())).
|
||||
|
||||
disable() ->
|
||||
foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
on_client_connected(ClientInfo, ConnInfo) ->
|
||||
Payload0 = common_infos(ClientInfo, ConnInfo),
|
||||
Payload = Payload0#{
|
||||
keepalive => maps:get(keepalive, ConnInfo, 0),
|
||||
clean_start => maps:get(clean_start, ConnInfo, true),
|
||||
expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
|
||||
},
|
||||
publish_event_msg(<<"$event/client_connected">>, Payload).
|
||||
|
||||
on_client_disconnected(
|
||||
ClientInfo,
|
||||
Reason,
|
||||
ConnInfo = #{disconnected_at := DisconnectedAt}
|
||||
) ->
|
||||
Payload0 = common_infos(ClientInfo, ConnInfo),
|
||||
Payload = Payload0#{
|
||||
reason => reason(Reason),
|
||||
disconnected_at => DisconnectedAt
|
||||
},
|
||||
publish_event_msg(<<"$event/client_disconnected">>, Payload).
|
||||
|
||||
on_client_subscribed(
|
||||
_ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username
|
||||
},
|
||||
Topic,
|
||||
SubOpts
|
||||
) ->
|
||||
Payload = #{
|
||||
clientid => ClientId,
|
||||
username => Username,
|
||||
topic => Topic,
|
||||
subopts => SubOpts,
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
publish_event_msg(<<"$event/client_subscribed">>, Payload).
|
||||
|
||||
on_client_unsubscribed(
|
||||
_ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username
|
||||
},
|
||||
Topic,
|
||||
_SubOpts
|
||||
) ->
|
||||
Payload = #{
|
||||
clientid => ClientId,
|
||||
username => Username,
|
||||
topic => Topic,
|
||||
ts => erlang:system_time(millisecond)
|
||||
},
|
||||
publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
|
||||
|
||||
on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
Payload0 = base_message(Message),
|
||||
Payload = Payload0#{
|
||||
reason => Reason,
|
||||
clientid => ClientId,
|
||||
username => emqx_message:get_header(username, Message, undefined),
|
||||
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
|
||||
},
|
||||
publish_event_msg(<<"$event/message_dropped">>, Payload)
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_delivered(
|
||||
_ClientInfo = #{
|
||||
peerhost := PeerHost,
|
||||
clientid := ReceiverCId,
|
||||
username := ReceiverUsername
|
||||
},
|
||||
#message{from = ClientId} = Message
|
||||
) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
Payload0 = base_message(Message),
|
||||
Payload = Payload0#{
|
||||
from_clientid => ClientId,
|
||||
from_username => emqx_message:get_header(username, Message, undefined),
|
||||
clientid => ReceiverCId,
|
||||
username => ReceiverUsername,
|
||||
peerhost => ntoa(PeerHost)
|
||||
},
|
||||
publish_event_msg(<<"$event/message_delivered">>, Payload)
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_acked(
|
||||
_ClientInfo = #{
|
||||
peerhost := PeerHost,
|
||||
clientid := ReceiverCId,
|
||||
username := ReceiverUsername
|
||||
},
|
||||
#message{from = ClientId} = Message
|
||||
) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
Payload0 = base_message(Message),
|
||||
Payload = Payload0#{
|
||||
from_clientid => ClientId,
|
||||
from_username => emqx_message:get_header(username, Message, undefined),
|
||||
clientid => ReceiverCId,
|
||||
username => ReceiverUsername,
|
||||
peerhost => ntoa(PeerHost)
|
||||
},
|
||||
publish_event_msg(<<"$event/message_acked">>, Payload)
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
%%--------------------------------------------------------------------
|
||||
common_infos(
|
||||
_ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
sockport := SockPort
|
||||
},
|
||||
_ConnInfo = #{
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
connected_at := ConnectedAt
|
||||
}
|
||||
) ->
|
||||
#{
|
||||
clientid => ClientId,
|
||||
username => Username,
|
||||
ipaddress => ntoa(PeerHost),
|
||||
sockport => SockPort,
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
connected_at => ConnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
}.
|
||||
|
||||
make_msg(Topic, Payload) ->
|
||||
emqx_message:set_flag(
|
||||
sys,
|
||||
emqx_message:make(
|
||||
?MODULE, 0, Topic, iolist_to_binary(Payload)
|
||||
)
|
||||
).
|
||||
|
||||
-compile({inline, [reason/1]}).
|
||||
reason(Reason) when is_atom(Reason) -> Reason;
|
||||
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
|
||||
reason({Error, _}) when is_atom(Error) -> Error;
|
||||
reason(_) -> internal_error.
|
||||
|
||||
ntoa(undefined) -> undefined;
|
||||
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
|
||||
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
|
||||
|
||||
printable_maps(undefined) ->
|
||||
#{};
|
||||
printable_maps(Headers) ->
|
||||
maps:fold(
|
||||
fun
|
||||
(K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
|
||||
AccIn#{K => ntoa(V0)};
|
||||
('User-Property', V0, AccIn) when is_list(V0) ->
|
||||
AccIn#{
|
||||
'User-Property' => maps:from_list(V0),
|
||||
'User-Property-Pairs' => [
|
||||
#{
|
||||
key => Key,
|
||||
value => Value
|
||||
}
|
||||
|| {Key, Value} <- V0
|
||||
]
|
||||
};
|
||||
(K, V0, AccIn) ->
|
||||
AccIn#{K => V0}
|
||||
end,
|
||||
#{},
|
||||
Headers
|
||||
).
|
||||
|
||||
base_message(Message) ->
|
||||
#message{
|
||||
id = Id,
|
||||
qos = QoS,
|
||||
flags = Flags,
|
||||
topic = Topic,
|
||||
headers = Headers,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp
|
||||
} = Message,
|
||||
#{
|
||||
id => emqx_guid:to_hexstr(Id),
|
||||
payload => Payload,
|
||||
topic => Topic,
|
||||
qos => QoS,
|
||||
flags => Flags,
|
||||
headers => printable_maps(Headers),
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
}.
|
||||
|
||||
ignore_sys_message(#message{flags = Flags}) ->
|
||||
maps:get(sys, Flags, false).
|
||||
|
||||
publish_event_msg(Topic, Payload) ->
|
||||
_ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
|
||||
ok.
|
||||
|
||||
enable(List) ->
|
||||
foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List).
|
||||
|
||||
check_enable(Handler, {client_connected, true}) ->
|
||||
Handler('client.connected', {?MODULE, on_client_connected, []});
|
||||
check_enable(Handler, {client_disconnected, true}) ->
|
||||
Handler('client.disconnected', {?MODULE, on_client_disconnected, []});
|
||||
check_enable(Handler, {client_subscribed, true}) ->
|
||||
Handler('session.subscribed', {?MODULE, on_client_subscribed, []});
|
||||
check_enable(Handler, {client_unsubscribed, true}) ->
|
||||
Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
|
||||
check_enable(Handler, {message_delivered, true}) ->
|
||||
Handler('message.delivered', {?MODULE, on_message_delivered, []});
|
||||
check_enable(Handler, {message_acked, true}) ->
|
||||
Handler('message.acked', {?MODULE, on_message_acked, []});
|
||||
check_enable(Handler, {message_dropped, true}) ->
|
||||
Handler('message.dropped', {?MODULE, on_message_dropped, []});
|
||||
check_enable(_Handler, {_Topic, _Enable}) ->
|
||||
ok.
|
||||
|
||||
foreach_with(Fun, With, [H | T]) ->
|
||||
Fun(With, H),
|
||||
foreach_with(Fun, With, T);
|
||||
foreach_with(_Fun, _With, []) ->
|
||||
ok.
|
|
@ -1,70 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_event_message_api).
|
||||
|
||||
-include("emqx_modules.hrl").
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-import(hoconsc, [mk/2, ref/2]).
|
||||
|
||||
-export([
|
||||
api_spec/0,
|
||||
paths/0,
|
||||
schema/1
|
||||
]).
|
||||
|
||||
-export([event_message/2]).
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
["/mqtt/event_message"].
|
||||
|
||||
schema("/mqtt/event_message") ->
|
||||
#{
|
||||
'operationId' => event_message,
|
||||
get =>
|
||||
#{
|
||||
description => <<"Event Message">>,
|
||||
tags => ?API_TAG_MQTT,
|
||||
responses =>
|
||||
#{200 => status_schema(<<"Get Event Message config successfully">>)}
|
||||
},
|
||||
put =>
|
||||
#{
|
||||
description => <<"Update Event Message">>,
|
||||
tags => ?API_TAG_MQTT,
|
||||
'requestBody' => status_schema(<<"Update Event Message config">>),
|
||||
responses =>
|
||||
#{200 => status_schema(<<"Update Event Message config successfully">>)}
|
||||
}
|
||||
}.
|
||||
|
||||
status_schema(Desc) ->
|
||||
mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}).
|
||||
|
||||
event_message(get, _Params) ->
|
||||
{200, emqx_event_message:list()};
|
||||
event_message(put, #{body := Body}) ->
|
||||
case emqx_event_message:update(Body) of
|
||||
{ok, NewConfig} ->
|
||||
{200, NewConfig};
|
||||
{error, Reason} ->
|
||||
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||
{500, 'INTERNAL_ERROR', Message}
|
||||
end.
|
|
@ -36,7 +36,6 @@ maybe_enable_modules() ->
|
|||
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
|
||||
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
|
||||
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
|
||||
emqx_event_message:enable(),
|
||||
emqx_conf_cli:load(),
|
||||
ok = emqx_rewrite:enable(),
|
||||
emqx_topic_metrics:enable(),
|
||||
|
@ -46,7 +45,6 @@ maybe_disable_modules() ->
|
|||
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
|
||||
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
|
||||
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
|
||||
emqx_event_message:disable(),
|
||||
emqx_rewrite:disable(),
|
||||
emqx_conf_cli:unload(),
|
||||
emqx_topic_metrics:disable(),
|
||||
|
|
|
@ -32,7 +32,6 @@ roots() ->
|
|||
[
|
||||
"delayed",
|
||||
"telemetry",
|
||||
"event_message",
|
||||
array("rewrite"),
|
||||
array("topic_metrics")
|
||||
].
|
||||
|
@ -63,76 +62,6 @@ fields("rewrite") ->
|
|||
)},
|
||||
{re, fun regular_expression/1}
|
||||
];
|
||||
fields("event_message") ->
|
||||
Fields =
|
||||
[
|
||||
{client_connected,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable client_connected event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{client_disconnected,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable client_disconnected event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{client_subscribed,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable client_subscribed event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{client_unsubscribed,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable client_unsubscribed event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{message_delivered,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable message_delivered event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{message_acked,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable message_acked event messages">>,
|
||||
default => false
|
||||
}
|
||||
)},
|
||||
{message_dropped,
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable/disable message_dropped event messages">>,
|
||||
default => false
|
||||
}
|
||||
)}
|
||||
],
|
||||
#{
|
||||
fields => Fields,
|
||||
desc =>
|
||||
"Enable/Disable system event messages.\n"
|
||||
"The messages are published to <code>$event</code> prefixed topics.\n"
|
||||
"For example, if `client_disconnected` is set to `true`,\n"
|
||||
"a message is published to <code>$event/client_connected</code> topic\n"
|
||||
"whenever a client is connected.\n"
|
||||
""
|
||||
};
|
||||
fields("topic_metrics") ->
|
||||
[{topic, sc(binary(), #{})}].
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ start_link() ->
|
|||
%% Supervisor callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
init([]) ->
|
||||
emqx_event_message:init_conf_handler(),
|
||||
{ok,
|
||||
{{one_for_one, 10, 3600}, [
|
||||
?CHILD(emqx_telemetry),
|
||||
|
|
|
@ -1,162 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_event_message_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(EVENT_MESSAGE, <<
|
||||
""
|
||||
"\n"
|
||||
"event_message: {\n"
|
||||
" client_connected: true\n"
|
||||
" client_disconnected: true\n"
|
||||
" client_subscribed: true\n"
|
||||
" client_unsubscribed: true\n"
|
||||
" message_delivered: true\n"
|
||||
" message_acked: true\n"
|
||||
" message_dropped: true\n"
|
||||
"}"
|
||||
""
|
||||
>>).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([emqx_modules]),
|
||||
load_config(),
|
||||
Config.
|
||||
|
||||
load_config() ->
|
||||
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE).
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
||||
|
||||
t_event_topic(_) ->
|
||||
ok = emqx_event_message:enable(),
|
||||
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"clientid">>},
|
||||
{username, <<"username">>}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
ok = recv_connected(<<"clientid">>),
|
||||
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_subscribed">>, qos1),
|
||||
_ = receive_publish(100),
|
||||
timer:sleep(50),
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
|
||||
ok = recv_subscribed(<<"clientid">>),
|
||||
emqtt:unsubscribe(C1, <<"$event/client_subscribed">>),
|
||||
timer:sleep(50),
|
||||
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
|
||||
_ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
|
||||
{ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100),
|
||||
{ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100),
|
||||
recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1),
|
||||
recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2),
|
||||
recv_message_acked(<<"clientid">>),
|
||||
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
|
||||
ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
|
||||
recv_message_dropped(<<"clientid">>),
|
||||
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1),
|
||||
_ = emqtt:unsubscribe(C2, <<"test_sub">>),
|
||||
ok = recv_unsubscribed(<<"clientid">>),
|
||||
|
||||
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1),
|
||||
ok = emqtt:disconnect(C2),
|
||||
ok = recv_disconnected(<<"clientid">>),
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqx_event_message:disable().
|
||||
|
||||
t_reason(_) ->
|
||||
?assertEqual(normal, emqx_event_message:reason(normal)),
|
||||
?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})),
|
||||
?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})),
|
||||
?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)).
|
||||
|
||||
recv_connected(ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/client_connected">>, Topic),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"clientid">> := ClientId,
|
||||
<<"username">> := <<"username">>,
|
||||
<<"ipaddress">> := <<"127.0.0.1">>,
|
||||
<<"proto_name">> := <<"MQTT">>,
|
||||
<<"proto_ver">> := ?MQTT_PROTO_V4,
|
||||
<<"clean_start">> := true,
|
||||
<<"expiry_interval">> := 0,
|
||||
<<"keepalive">> := 60
|
||||
},
|
||||
emqx_json:decode(Payload, [return_maps])
|
||||
).
|
||||
|
||||
recv_subscribed(_ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/client_subscribed">>, Topic).
|
||||
|
||||
recv_message_dropped(_ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/message_dropped">>, Topic).
|
||||
|
||||
recv_message_publish_or_delivered(_ClientId, 0, Topic) ->
|
||||
?assertMatch(<<"$event/message_delivered">>, Topic);
|
||||
recv_message_publish_or_delivered(_ClientId, 1, Topic) ->
|
||||
?assertMatch(<<"test_sub">>, Topic).
|
||||
|
||||
recv_message_acked(_ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/message_acked">>, Topic).
|
||||
|
||||
recv_unsubscribed(_ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/client_unsubscribed">>, Topic).
|
||||
|
||||
recv_disconnected(ClientId) ->
|
||||
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
|
||||
?assertMatch(<<"$event/client_disconnected">>, Topic),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"clientid">> := ClientId,
|
||||
<<"username">> := <<"username">>,
|
||||
<<"reason">> := <<"normal">>
|
||||
},
|
||||
emqx_json:decode(Payload, [return_maps])
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
receive_publish(Timeout) ->
|
||||
receive
|
||||
{publish, Publish} ->
|
||||
{ok, Publish}
|
||||
after Timeout -> {error, timeout}
|
||||
end.
|
Loading…
Reference in New Issue