350 lines
18 KiB
Erlang
350 lines
18 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_stomp_SUITE).
|
|
|
|
-include_lib("emqx_stomp/include/emqx_stomp.hrl").
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-define(HEARTBEAT, <<$\n>>).
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Setups
|
|
%%--------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:start_apps([emqx_stomp]),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_ct_helpers:stop_apps([emqx_stomp]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
t_connect(_) ->
|
|
%% Connect should be succeed
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"1000,2000">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, Frame = #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
<<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"DISCONNECT">>,
|
|
[{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"RECEIPT">>,
|
|
headers = [{<<"receipt-id">>, <<"12345">>}],
|
|
body = _}, _} = parse(Data1)
|
|
end),
|
|
|
|
%% Connect will be failed, because of bad login or passcode
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"admin">>},
|
|
{<<"passcode">>, <<"admin">>},
|
|
{<<"heart-beat">>, <<"1000,2000">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"ERROR">>,
|
|
headers = _,
|
|
body = <<"Login or passcode error!">>}, _} = parse(Data)
|
|
end),
|
|
|
|
%% Connect will be failed, because of bad version
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, <<"2.0,2.1">>},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"1000,2000">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"ERROR">>,
|
|
headers = _,
|
|
body = <<"Supported protocol versions < 1.2">>}, _} = parse(Data)
|
|
end).
|
|
|
|
t_heartbeat(_) ->
|
|
%% Test heart beat
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"1000,2000">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
|
|
{ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0),
|
|
%% Server will close the connection because never receive the heart beat from client
|
|
{error, closed} = gen_tcp:recv(Sock, 0)
|
|
end).
|
|
|
|
t_subscribe(_) ->
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"0,0">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
|
|
%% Subscribe
|
|
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
|
|
[{<<"id">>, 0},
|
|
{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"ack">>, <<"auto">>}])),
|
|
|
|
%% 'user-defined' header will be retain
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"user-defined">>, <<"emq">>}],
|
|
<<"hello">>)),
|
|
|
|
{ok, Data1} = gen_tcp:recv(Sock, 0, 1000),
|
|
{ok, Frame = #stomp_frame{command = <<"MESSAGE">>,
|
|
headers = _,
|
|
body = <<"hello">>}, _} = parse(Data1),
|
|
lists:foreach(fun({Key, Val}) ->
|
|
Val = proplists:get_value(Key, Frame#stomp_frame.headers)
|
|
end, [{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"subscription">>, <<"0">>},
|
|
{<<"user-defined">>, <<"emq">>}]),
|
|
|
|
%% Unsubscribe
|
|
gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>,
|
|
[{<<"id">>, 0},
|
|
{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data2} = gen_tcp:recv(Sock, 0, 1000),
|
|
|
|
{ok, #stomp_frame{command = <<"RECEIPT">>,
|
|
headers = [{<<"receipt-id">>, <<"12345">>}],
|
|
body = _}, _} = parse(Data2),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>}],
|
|
<<"You will not receive this msg">>)),
|
|
|
|
{error, timeout} = gen_tcp:recv(Sock, 0, 500)
|
|
end).
|
|
|
|
t_transaction(_) ->
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"0,0">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
|
|
%% Subscribe
|
|
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
|
|
[{<<"id">>, 0},
|
|
{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"ack">>, <<"auto">>}])),
|
|
|
|
%% Transaction: tx1
|
|
gen_tcp:send(Sock, serialize(<<"BEGIN">>,
|
|
[{<<"transaction">>, <<"tx1">>}])),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"transaction">>, <<"tx1">>}],
|
|
<<"hello">>)),
|
|
|
|
%% You will not receive any messages
|
|
{error, timeout} = gen_tcp:recv(Sock, 0, 1000),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"transaction">>, <<"tx1">>}],
|
|
<<"hello again">>)),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"COMMIT">>,
|
|
[{<<"transaction">>, <<"tx1">>}])),
|
|
|
|
ct:sleep(1000),
|
|
{ok, Data1} = gen_tcp:recv(Sock, 0, 500),
|
|
|
|
{ok, #stomp_frame{command = <<"MESSAGE">>,
|
|
headers = _,
|
|
body = <<"hello">>}, Rest1} = parse(Data1),
|
|
|
|
%{ok, Data2} = gen_tcp:recv(Sock, 0, 500),
|
|
{ok, #stomp_frame{command = <<"MESSAGE">>,
|
|
headers = _,
|
|
body = <<"hello again">>}, _Rest2} = parse(Rest1),
|
|
|
|
%% Transaction: tx2
|
|
gen_tcp:send(Sock, serialize(<<"BEGIN">>,
|
|
[{<<"transaction">>, <<"tx2">>}])),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"transaction">>, <<"tx2">>}],
|
|
<<"hello">>)),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"ABORT">>,
|
|
[{<<"transaction">>, <<"tx2">>}])),
|
|
|
|
%% You will not receive any messages
|
|
{error, timeout} = gen_tcp:recv(Sock, 0, 1000),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"DISCONNECT">>,
|
|
[{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data3} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"RECEIPT">>,
|
|
headers = [{<<"receipt-id">>, <<"12345">>}],
|
|
body = _}, _} = parse(Data3)
|
|
end).
|
|
|
|
t_receipt_in_error(_) ->
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"0,0">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"ABORT">>,
|
|
[{<<"transaction">>, <<"tx1">>},
|
|
{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
|
{ok, Frame = #stomp_frame{command = <<"ERROR">>,
|
|
headers = _,
|
|
body = <<"Transaction tx1 not found">>}, _} = parse(Data1),
|
|
|
|
<<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers)
|
|
end).
|
|
|
|
t_ack(_) ->
|
|
with_connection(fun(Sock) ->
|
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
|
[{<<"accept-version">>, ?STOMP_VER},
|
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
|
{<<"login">>, <<"guest">>},
|
|
{<<"passcode">>, <<"guest">>},
|
|
{<<"heart-beat">>, <<"0,0">>}])),
|
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
|
headers = _,
|
|
body = _}, _} = parse(Data),
|
|
|
|
%% Subscribe
|
|
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
|
|
[{<<"id">>, 0},
|
|
{<<"destination">>, <<"/queue/foo">>},
|
|
{<<"ack">>, <<"client">>}])),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>}],
|
|
<<"ack test">>)),
|
|
|
|
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
|
{ok, Frame = #stomp_frame{command = <<"MESSAGE">>,
|
|
headers = _,
|
|
body = <<"ack test">>}, _} = parse(Data1),
|
|
|
|
AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"ACK">>,
|
|
[{<<"id">>, AckId},
|
|
{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data2} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"RECEIPT">>,
|
|
headers = [{<<"receipt-id">>, <<"12345">>}],
|
|
body = _}, _} = parse(Data2),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
|
[{<<"destination">>, <<"/queue/foo">>}],
|
|
<<"nack test">>)),
|
|
|
|
{ok, Data3} = gen_tcp:recv(Sock, 0),
|
|
{ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>,
|
|
headers = _,
|
|
body = <<"nack test">>}, _} = parse(Data3),
|
|
|
|
AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers),
|
|
|
|
gen_tcp:send(Sock, serialize(<<"NACK">>,
|
|
[{<<"id">>, AckId1},
|
|
{<<"receipt">>, <<"12345">>}])),
|
|
|
|
{ok, Data4} = gen_tcp:recv(Sock, 0),
|
|
{ok, #stomp_frame{command = <<"RECEIPT">>,
|
|
headers = [{<<"receipt-id">>, <<"12345">>}],
|
|
body = _}, _} = parse(Data4)
|
|
end).
|
|
|
|
with_connection(DoFun) ->
|
|
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
|
|
61613,
|
|
[binary, {packet, raw}, {active, false}],
|
|
3000),
|
|
try
|
|
DoFun(Sock)
|
|
after
|
|
gen_tcp:close(Sock)
|
|
end.
|
|
|
|
serialize(Command, Headers) ->
|
|
emqx_stomp_frame:serialize(emqx_stomp_frame:make(Command, Headers)).
|
|
|
|
serialize(Command, Headers, Body) ->
|
|
emqx_stomp_frame:serialize(emqx_stomp_frame:make(Command, Headers, Body)).
|
|
|
|
parse(Data) ->
|
|
ProtoEnv = [{max_headers, 10},
|
|
{max_header_length, 1024},
|
|
{max_body_length, 8192}],
|
|
Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
|
|
emqx_stomp_frame:parse(Data, Parser).
|