chore: cover stomp mountpoint tests

This commit is contained in:
JianBo He 2023-06-13 11:24:09 +08:00
parent 4212d90672
commit c4c32f5032
1 changed files with 72 additions and 2 deletions

View File

@ -60,11 +60,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) ->
application:load(emqx_gateway_stomp),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
Cfg.
end_per_suite(_Cfg) ->
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]),
ok.
default_config() ->
@ -73,6 +73,13 @@ default_config() ->
stomp_ver() ->
?STOMP_VER.
restart_stomp_with_mountpoint(Mountpoint) ->
Conf = emqx:get_raw_config([gateway, stomp]),
emqx_gateway_conf:update_gateway(
stomp,
Conf#{<<"mountpoint">> => Mountpoint}
).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
@ -938,6 +945,69 @@ t_authn_superuser(_) ->
with_connection(LoginAsSuperUser),
meck:unload(emqx_access_control).
t_mountpoint(_) ->
restart_stomp_with_mountpoint(<<"stomp/">>),
PubSub = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
{ok, #stomp_frame{
command = <<"MESSAGE">>,
headers = Headers,
body = <<"hello">>
}} = recv_a_frame(Sock),
?assertEqual(<<"t/a">>, proplists:get_value(<<"destination">>, Headers)),
ok = send_disconnect_frame(Sock)
end,
PubToMqtt = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = emqx:subscribe(<<"stomp/t/a">>),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
receive
{deliver, Topic, Msg} ->
?assertEqual(<<"stomp/t/a">>, Topic),
?assertEqual(<<"hello">>, emqx_message:payload(Msg))
after 100 ->
?assert(false, "waiting message timeout")
end,
ok = send_disconnect_frame(Sock)
end,
ReceiveMsgFromMqtt = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
Msg = emqx_message:make(<<"stomp/t/a">>, <<"hello">>),
emqx:publish(Msg),
{ok, #stomp_frame{
command = <<"MESSAGE">>,
headers = Headers,
body = <<"hello">>
}} = recv_a_frame(Sock),
?assertEqual(<<"t/a">>, proplists:get_value(<<"destination">>, Headers)),
ok = send_disconnect_frame(Sock)
end,
with_connection(PubSub),
with_connection(PubToMqtt),
with_connection(ReceiveMsgFromMqtt),
restart_stomp_with_mountpoint(<<>>).
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
%% Listeners, Metrics, Stats, ClientInfo
%%