%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_stomp_SUITE). -include_lib("emqx_gateway/src/stomp/include/emqx_stomp.hrl"). -compile(export_all). -compile(nowarn_export_all). -define(HEARTBEAT, <<$\n>>). -define(CONF_DEFAULT, <<" gateway.stomp { clientinfo_override { username = \"${Packet.headers.login}\" password = \"${Packet.headers.passcode}\" } listeners.tcp.default { bind = 61613 } } ">>). all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- init_per_suite(Cfg) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), emqx_ct_helpers:start_apps([emqx_gateway]), Cfg. end_per_suite(_Cfg) -> emqx_ct_helpers:stop_apps([emqx_gateway]), ok. %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- t_connect(_) -> %% Connect should be succeed with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"1000,2000">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, Frame = #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), <<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers), gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, [{<<"receipt">>, <<"12345">>}])), {ok, Data1} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"RECEIPT">>, headers = [{<<"receipt-id">>, <<"12345">>}], body = _}, _, _} = parse(Data1) end), %% Connect will be failed, because of bad login or passcode %% FIXME: Waiting for authentication works %with_connection(fun(Sock) -> % gen_tcp:send(Sock, serialize(<<"CONNECT">>, % [{<<"accept-version">>, ?STOMP_VER}, % {<<"host">>, <<"127.0.0.1:61613">>}, % {<<"login">>, <<"admin">>}, % {<<"passcode">>, <<"admin">>}, % {<<"heart-beat">>, <<"1000,2000">>}])), % {ok, Data} = gen_tcp:recv(Sock, 0), % {ok, #stomp_frame{command = <<"ERROR">>, % headers = _, % body = <<"Login or passcode error!">>}, _, _} = parse(Data) % end), %% Connect will be failed, because of bad version with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, <<"2.0,2.1">>}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"1000,2000">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"ERROR">>, headers = _, body = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data) end). t_heartbeat(_) -> %% Test heart beat with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"1000,800">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), {ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0), %% Server will close the connection because never receive the heart beat from client {error, closed} = gen_tcp:recv(Sock, 0) end). t_subscribe(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"0,0">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), %% Subscribe gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, [{<<"id">>, 0}, {<<"destination">>, <<"/queue/foo">>}, {<<"ack">>, <<"auto">>}])), %% 'user-defined' header will be retain gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}, {<<"user-defined">>, <<"emq">>}], <<"hello">>)), {ok, Data1} = gen_tcp:recv(Sock, 0, 1000), {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, headers = _, body = <<"hello">>}, _, _} = parse(Data1), lists:foreach(fun({Key, Val}) -> Val = proplists:get_value(Key, Frame#stomp_frame.headers) end, [{<<"destination">>, <<"/queue/foo">>}, {<<"subscription">>, <<"0">>}, {<<"user-defined">>, <<"emq">>}]), %% Unsubscribe gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, [{<<"id">>, 0}, {<<"receipt">>, <<"12345">>}])), {ok, Data2} = gen_tcp:recv(Sock, 0, 1000), {ok, #stomp_frame{command = <<"RECEIPT">>, headers = [{<<"receipt-id">>, <<"12345">>}], body = _}, _, _} = parse(Data2), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}], <<"You will not receive this msg">>)), {error, timeout} = gen_tcp:recv(Sock, 0, 500) end). t_transaction(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"0,0">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), %% Subscribe gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, [{<<"id">>, 0}, {<<"destination">>, <<"/queue/foo">>}, {<<"ack">>, <<"auto">>}])), %% Transaction: tx1 gen_tcp:send(Sock, serialize(<<"BEGIN">>, [{<<"transaction">>, <<"tx1">>}])), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}, {<<"transaction">>, <<"tx1">>}], <<"hello">>)), %% You will not receive any messages {error, timeout} = gen_tcp:recv(Sock, 0, 1000), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}, {<<"transaction">>, <<"tx1">>}], <<"hello again">>)), gen_tcp:send(Sock, serialize(<<"COMMIT">>, [{<<"transaction">>, <<"tx1">>}])), ct:sleep(1000), {ok, Data1} = gen_tcp:recv(Sock, 0, 500), {ok, #stomp_frame{command = <<"MESSAGE">>, headers = _, body = <<"hello">>}, Rest1, _} = parse(Data1), %{ok, Data2} = gen_tcp:recv(Sock, 0, 500), {ok, #stomp_frame{command = <<"MESSAGE">>, headers = _, body = <<"hello again">>}, _Rest2, _} = parse(Rest1), %% Transaction: tx2 gen_tcp:send(Sock, serialize(<<"BEGIN">>, [{<<"transaction">>, <<"tx2">>}])), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}, {<<"transaction">>, <<"tx2">>}], <<"hello">>)), gen_tcp:send(Sock, serialize(<<"ABORT">>, [{<<"transaction">>, <<"tx2">>}])), %% You will not receive any messages {error, timeout} = gen_tcp:recv(Sock, 0, 1000), gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, [{<<"receipt">>, <<"12345">>}])), {ok, Data3} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"RECEIPT">>, headers = [{<<"receipt-id">>, <<"12345">>}], body = _}, _, _} = parse(Data3) end). t_receipt_in_error(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"0,0">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), gen_tcp:send(Sock, serialize(<<"ABORT">>, [{<<"transaction">>, <<"tx1">>}, {<<"receipt">>, <<"12345">>}])), {ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Frame = #stomp_frame{command = <<"ERROR">>, headers = _, body = <<"Transaction tx1 not found">>}, _, _} = parse(Data1), <<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers) end). t_ack(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>, [{<<"accept-version">>, ?STOMP_VER}, {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, {<<"heart-beat">>, <<"0,0">>}])), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"CONNECTED">>, headers = _, body = _}, _, _} = parse(Data), %% Subscribe gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, [{<<"id">>, 0}, {<<"destination">>, <<"/queue/foo">>}, {<<"ack">>, <<"client">>}])), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}], <<"ack test">>)), {ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, headers = _, body = <<"ack test">>}, _, _} = parse(Data1), AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers), gen_tcp:send(Sock, serialize(<<"ACK">>, [{<<"id">>, AckId}, {<<"receipt">>, <<"12345">>}])), {ok, Data2} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"RECEIPT">>, headers = [{<<"receipt-id">>, <<"12345">>}], body = _}, _, _} = parse(Data2), gen_tcp:send(Sock, serialize(<<"SEND">>, [{<<"destination">>, <<"/queue/foo">>}], <<"nack test">>)), {ok, Data3} = gen_tcp:recv(Sock, 0), {ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>, headers = _, body = <<"nack test">>}, _, _} = parse(Data3), AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers), gen_tcp:send(Sock, serialize(<<"NACK">>, [{<<"id">>, AckId1}, {<<"receipt">>, <<"12345">>}])), {ok, Data4} = gen_tcp:recv(Sock, 0), {ok, #stomp_frame{command = <<"RECEIPT">>, headers = [{<<"receipt-id">>, <<"12345">>}], body = _}, _, _} = parse(Data4) end). %% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride, %% Listeners, Metrics, Stats, ClientInfo %% %% TODO: Start/Stop, List Instace %% %% TODO: RateLimit, OOM, with_connection(DoFun) -> {ok, Sock} = gen_tcp:connect({127, 0, 0, 1}, 61613, [binary, {packet, raw}, {active, false}], 3000), try DoFun(Sock) after gen_tcp:close(Sock) end. serialize(Command, Headers) -> emqx_stomp_frame:serialize_pkt(emqx_stomp_frame:make(Command, Headers), #{}). serialize(Command, Headers, Body) -> emqx_stomp_frame:serialize_pkt(emqx_stomp_frame:make(Command, Headers, Body), #{}). parse(Data) -> ProtoEnv = #{max_headers => 10, max_header_length => 1024, max_body_length => 8192 }, Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv), emqx_stomp_frame:parse(Data, Parser).