146 lines
5.7 KiB
Erlang
146 lines
5.7 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_event_message_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(EVENT_MESSAGE, <<"""
|
|
event_message: {
|
|
topics : [
|
|
\"$event/client_connected\",
|
|
\"$event/client_disconnected\",
|
|
\"$event/session_subscribed\",
|
|
\"$event/session_unsubscribed\",
|
|
\"$event/message_delivered\",
|
|
\"$event/message_acked\",
|
|
\"$event/message_dropped\"
|
|
]}""">>).
|
|
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:boot_modules(all),
|
|
emqx_ct_helpers:start_apps([emqx_modules]),
|
|
ok = emqx_config:init_load(emqx_modules_schema, ?EVENT_MESSAGE),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_ct_helpers:stop_apps([emqx_modules]).
|
|
|
|
t_event_topic(_) ->
|
|
ok = emqx_event_message:enable(),
|
|
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
|
|
{ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
|
|
{username, <<"username">>}]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
ok = recv_connected(<<"clientid">>),
|
|
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_subscribed">>, qos1),
|
|
_ = receive_publish(100),
|
|
timer:sleep(50),
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
|
|
ok = recv_subscribed(<<"clientid">>),
|
|
emqtt:unsubscribe(C1, <<"$event/session_subscribed">>),
|
|
timer:sleep(50),
|
|
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
|
|
_ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
|
|
{ok, #{qos := QOS1, topic := Topic1}} = receive_publish(100),
|
|
{ok, #{qos := QOS2, topic := Topic2}} = receive_publish(100),
|
|
recv_message_publish_or_delivered(<<"clientid">>, QOS1, Topic1),
|
|
recv_message_publish_or_delivered(<<"clientid">>, QOS2, Topic2),
|
|
recv_message_acked(<<"clientid">>),
|
|
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
|
|
ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
|
|
recv_message_dropped(<<"clientid">>),
|
|
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_unsubscribed">>, qos1),
|
|
_ = emqtt:unsubscribe(C2, <<"test_sub">>),
|
|
ok = recv_unsubscribed(<<"clientid">>),
|
|
|
|
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1),
|
|
ok = emqtt:disconnect(C2),
|
|
ok = recv_disconnected(<<"clientid">>),
|
|
ok = emqtt:disconnect(C1),
|
|
ok = emqx_event_message:disable().
|
|
|
|
t_reason(_) ->
|
|
?assertEqual(normal, emqx_event_message:reason(normal)),
|
|
?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})),
|
|
?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})),
|
|
?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)).
|
|
|
|
recv_connected(ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
|
|
?assertMatch(<<"$event/client_connected">>, Topic),
|
|
?assertMatch(#{<<"clientid">> := ClientId,
|
|
<<"username">> := <<"username">>,
|
|
<<"ipaddress">> := <<"127.0.0.1">>,
|
|
<<"proto_name">> := <<"MQTT">>,
|
|
<<"proto_ver">> := ?MQTT_PROTO_V4,
|
|
<<"connack">> := ?RC_SUCCESS,
|
|
<<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])).
|
|
|
|
recv_subscribed(_ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
|
?assertMatch(<<"$event/session_subscribed">>, Topic).
|
|
|
|
recv_message_dropped(_ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
|
?assertMatch(<<"$event/message_dropped">>, Topic).
|
|
|
|
recv_message_publish_or_delivered(_ClientId, 0, Topic) ->
|
|
?assertMatch(<<"$event/message_delivered">>, Topic);
|
|
recv_message_publish_or_delivered(_ClientId, 1, Topic) ->
|
|
?assertMatch(<<"test_sub">>, Topic).
|
|
|
|
recv_message_acked(_ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
|
?assertMatch(<<"$event/message_acked">>, Topic).
|
|
|
|
recv_unsubscribed(_ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
|
|
?assertMatch(<<"$event/session_unsubscribed">>, Topic).
|
|
|
|
recv_disconnected(ClientId) ->
|
|
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
|
|
?assertMatch(<<"$event/client_disconnected">>, Topic),
|
|
?assertMatch(#{<<"clientid">> := ClientId,
|
|
<<"username">> := <<"username">>,
|
|
<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
receive_publish(Timeout) ->
|
|
receive
|
|
{publish, Publish} ->
|
|
{ok, Publish}
|
|
after
|
|
Timeout -> {error, timeout}
|
|
end.
|