test(gw): add clients HTTP-API tests

This commit is contained in:
JianBo He 2021-10-18 17:39:50 +08:00
parent 6f0d0ab473
commit 46e0609544
6 changed files with 305 additions and 240 deletions

View File

@ -93,25 +93,24 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) ->
end); end);
gateway_insta(get, #{bindings := #{name := Name0}}) -> gateway_insta(get, #{bindings := #{name := Name0}}) ->
try try
GwName = try binary_to_existing_atom(Name0)
binary_to_existing_atom(Name0) of
catch _ : _ -> error(badname) GwName ->
end, case emqx_gateway:lookup(GwName) of
case emqx_gateway:lookup(GwName) of undefined ->
undefined -> {200, #{name => GwName, status => unloaded}};
{200, #{name => GwName, status => unloaded}}; Gateway ->
Gateway -> GwConf = emqx_gateway_conf:gateway(Name0),
GwConf = emqx_gateway_conf:gateway(Name0), GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339( [created_at, started_at, stopped_at],
[created_at, started_at, stopped_at], Gateway),
Gateway), GwInfo1 = maps:with([name,
GwInfo1 = maps:with([name, status,
status, created_at,
created_at, started_at,
started_at, stopped_at], GwInfo0),
stopped_at], GwInfo0), {200, maps:merge(GwConf, GwInfo1)}
{200, maps:merge(GwConf, GwInfo1)} end
end
catch catch
error : badname -> error : badname ->
return_http_error(400, "Bad gateway name") return_http_error(400, "Bad gateway name")

View File

@ -107,7 +107,7 @@ swagger("/gateway/:name/authentication", get) ->
} }
}; };
swagger("/gateway/:name/authentication", put) -> swagger("/gateway/:name/authentication", put) ->
#{ description => <<"Create the gateway authentication">> #{ description => <<"Update authentication for the gateway">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, requestBody => schema_authn() , requestBody => schema_authn()
, responses => , responses =>

View File

@ -123,7 +123,7 @@ clients_insta(delete, #{ bindings := #{name := Name0,
ClientId = emqx_mgmt_util:urldecode(ClientId0), ClientId = emqx_mgmt_util:urldecode(ClientId0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:kickout_client(GwName, ClientId), _ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200} {204}
end). end).
%% FIXME: %% FIXME:
@ -157,7 +157,7 @@ subscriptions(post, #{ bindings := #{name := Name0,
{error, Reason} -> {error, Reason} ->
return_http_error(404, Reason); return_http_error(404, Reason);
ok -> ok ->
{200} {204}
end end
end end
end); end);
@ -172,7 +172,7 @@ subscriptions(delete, #{ bindings := #{name := Name0,
Topic = emqx_mgmt_util:urldecode(Topic0), Topic = emqx_mgmt_util:urldecode(Topic0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200} {204}
end). end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -444,7 +444,7 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
#{ <<"400">> => schema_bad_request() #{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found() , <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error() , <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content() , <<"204">> => schema_no_content()
} }
}; };
swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->

View File

@ -21,6 +21,7 @@
-import(emqx_gateway_test_utils, -import(emqx_gateway_test_utils,
[ assert_confs/2 [ assert_confs/2
, assert_feilds_apperence/2
, request/2 , request/2
, request/3 , request/3
]). ]).
@ -271,10 +272,3 @@ assert_gw_unloaded(Gateway) ->
assert_bad_request(BadReq) -> assert_bad_request(BadReq) ->
?assertEqual(<<"BAD_REQUEST">>, maps:get(code, BadReq)). ?assertEqual(<<"BAD_REQUEST">>, maps:get(code, BadReq)).
assert_feilds_apperence(Ks, Map) ->
lists:foreach(fun(K) ->
_ = maps:get(K, Map)
end, Ks).

View File

@ -60,6 +60,11 @@ maybe_unconvert_listeners(Conf) when is_map(Conf) ->
maybe_unconvert_listeners(Conf) -> maybe_unconvert_listeners(Conf) ->
Conf. Conf.
assert_feilds_apperence(Ks, Map) ->
lists:foreach(fun(K) ->
_ = maps:get(K, Map)
end, Ks).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% http %% http

View File

@ -16,11 +16,18 @@
-module(emqx_stomp_SUITE). -module(emqx_stomp_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_gateway/src/stomp/include/emqx_stomp.hrl"). -include_lib("emqx_gateway/src/stomp/include/emqx_stomp.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-import(emqx_gateway_test_utils,
[ assert_feilds_apperence/2
, request/2
, request/3
]).
-define(HEARTBEAT, <<$\n>>). -define(HEARTBEAT, <<$\n>>).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
@ -43,11 +50,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_gateway]), emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
Cfg. Cfg.
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
emqx_common_test_helpers:stop_apps([emqx_gateway]), emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -57,26 +64,26 @@ end_per_suite(_Cfg) ->
t_connect(_) -> t_connect(_) ->
%% Connect should be succeed %% Connect should be succeed
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,2000">>}])), {<<"heart-beat">>, <<"1000,2000">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, Frame = #stomp_frame{command = <<"CONNECTED">>, {ok, Frame = #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
<<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers), <<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers),
gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, gen_tcp:send(Sock, serialize(<<"DISCONNECT">>,
[{<<"receipt">>, <<"12345">>}])), [{<<"receipt">>, <<"12345">>}])),
{ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Data1} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"RECEIPT">>, {ok, #stomp_frame{command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}], headers = [{<<"receipt-id">>, <<"12345">>}],
body = _}, _, _} = parse(Data1) body = _}, _, _} = parse(Data1)
end), end),
%% Connect will be failed, because of bad login or passcode %% Connect will be failed, because of bad login or passcode
%% FIXME: Waiting for authentication works %% FIXME: Waiting for authentication works
@ -95,249 +102,309 @@ t_connect(_) ->
%% Connect will be failed, because of bad version %% Connect will be failed, because of bad version
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, <<"2.0,2.1">>}, [{<<"accept-version">>, <<"2.0,2.1">>},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,2000">>}])), {<<"heart-beat">>, <<"1000,2000">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"ERROR">>, {ok, #stomp_frame{command = <<"ERROR">>,
headers = _, headers = _,
body = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data) body = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data)
end). end).
t_heartbeat(_) -> t_heartbeat(_) ->
%% Test heart beat %% Test heart beat
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,800">>}])), {<<"heart-beat">>, <<"1000,800">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>, {ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
{ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0), {ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0),
%% Server will close the connection because never receive the heart beat from client %% Server will close the connection because never receive the heart beat from client
{error, closed} = gen_tcp:recv(Sock, 0) {error, closed} = gen_tcp:recv(Sock, 0)
end). end).
t_subscribe(_) -> t_subscribe(_) ->
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])), {<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>, {ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
%% Subscribe %% Subscribe
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
[{<<"id">>, 0}, [{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>}, {<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"auto">>}])), {<<"ack">>, <<"auto">>}])),
%% 'user-defined' header will be retain %% 'user-defined' header will be retain
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}, [{<<"destination">>, <<"/queue/foo">>},
{<<"user-defined">>, <<"emq">>}], {<<"user-defined">>, <<"emq">>}],
<<"hello">>)), <<"hello">>)),
{ok, Data1} = gen_tcp:recv(Sock, 0, 1000), {ok, Data1} = gen_tcp:recv(Sock, 0, 1000),
{ok, Frame = #stomp_frame{command = <<"MESSAGE">>, {ok, Frame = #stomp_frame{command = <<"MESSAGE">>,
headers = _, headers = _,
body = <<"hello">>}, _, _} = parse(Data1), body = <<"hello">>}, _, _} = parse(Data1),
lists:foreach(fun({Key, Val}) -> lists:foreach(fun({Key, Val}) ->
Val = proplists:get_value(Key, Frame#stomp_frame.headers) Val = proplists:get_value(Key, Frame#stomp_frame.headers)
end, [{<<"destination">>, <<"/queue/foo">>}, end, [{<<"destination">>, <<"/queue/foo">>},
{<<"subscription">>, <<"0">>}, {<<"subscription">>, <<"0">>},
{<<"user-defined">>, <<"emq">>}]), {<<"user-defined">>, <<"emq">>}]),
%% Unsubscribe %% Unsubscribe
gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>,
[{<<"id">>, 0}, [{<<"id">>, 0},
{<<"receipt">>, <<"12345">>}])), {<<"receipt">>, <<"12345">>}])),
{ok, Data2} = gen_tcp:recv(Sock, 0, 1000), {ok, Data2} = gen_tcp:recv(Sock, 0, 1000),
{ok, #stomp_frame{command = <<"RECEIPT">>, {ok, #stomp_frame{command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}], headers = [{<<"receipt-id">>, <<"12345">>}],
body = _}, _, _} = parse(Data2), body = _}, _, _} = parse(Data2),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}], [{<<"destination">>, <<"/queue/foo">>}],
<<"You will not receive this msg">>)), <<"You will not receive this msg">>)),
{error, timeout} = gen_tcp:recv(Sock, 0, 500) {error, timeout} = gen_tcp:recv(Sock, 0, 500)
end). end).
t_transaction(_) -> t_transaction(_) ->
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])), {<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>, {ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
%% Subscribe %% Subscribe
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
[{<<"id">>, 0}, [{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>}, {<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"auto">>}])), {<<"ack">>, <<"auto">>}])),
%% Transaction: tx1 %% Transaction: tx1
gen_tcp:send(Sock, serialize(<<"BEGIN">>, gen_tcp:send(Sock, serialize(<<"BEGIN">>,
[{<<"transaction">>, <<"tx1">>}])), [{<<"transaction">>, <<"tx1">>}])),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}, [{<<"destination">>, <<"/queue/foo">>},
{<<"transaction">>, <<"tx1">>}], {<<"transaction">>, <<"tx1">>}],
<<"hello">>)), <<"hello">>)),
%% You will not receive any messages %% You will not receive any messages
{error, timeout} = gen_tcp:recv(Sock, 0, 1000), {error, timeout} = gen_tcp:recv(Sock, 0, 1000),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}, [{<<"destination">>, <<"/queue/foo">>},
{<<"transaction">>, <<"tx1">>}], {<<"transaction">>, <<"tx1">>}],
<<"hello again">>)), <<"hello again">>)),
gen_tcp:send(Sock, serialize(<<"COMMIT">>, gen_tcp:send(Sock, serialize(<<"COMMIT">>,
[{<<"transaction">>, <<"tx1">>}])), [{<<"transaction">>, <<"tx1">>}])),
ct:sleep(1000), ct:sleep(1000),
{ok, Data1} = gen_tcp:recv(Sock, 0, 500), {ok, Data1} = gen_tcp:recv(Sock, 0, 500),
{ok, #stomp_frame{command = <<"MESSAGE">>, {ok, #stomp_frame{command = <<"MESSAGE">>,
headers = _, headers = _,
body = <<"hello">>}, Rest1, _} = parse(Data1), body = <<"hello">>}, Rest1, _} = parse(Data1),
%{ok, Data2} = gen_tcp:recv(Sock, 0, 500), %{ok, Data2} = gen_tcp:recv(Sock, 0, 500),
{ok, #stomp_frame{command = <<"MESSAGE">>, {ok, #stomp_frame{command = <<"MESSAGE">>,
headers = _, headers = _,
body = <<"hello again">>}, _Rest2, _} = parse(Rest1), body = <<"hello again">>}, _Rest2, _} = parse(Rest1),
%% Transaction: tx2 %% Transaction: tx2
gen_tcp:send(Sock, serialize(<<"BEGIN">>, gen_tcp:send(Sock, serialize(<<"BEGIN">>,
[{<<"transaction">>, <<"tx2">>}])), [{<<"transaction">>, <<"tx2">>}])),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}, [{<<"destination">>, <<"/queue/foo">>},
{<<"transaction">>, <<"tx2">>}], {<<"transaction">>, <<"tx2">>}],
<<"hello">>)), <<"hello">>)),
gen_tcp:send(Sock, serialize(<<"ABORT">>, gen_tcp:send(Sock, serialize(<<"ABORT">>,
[{<<"transaction">>, <<"tx2">>}])), [{<<"transaction">>, <<"tx2">>}])),
%% You will not receive any messages %% You will not receive any messages
{error, timeout} = gen_tcp:recv(Sock, 0, 1000), {error, timeout} = gen_tcp:recv(Sock, 0, 1000),
gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, gen_tcp:send(Sock, serialize(<<"DISCONNECT">>,
[{<<"receipt">>, <<"12345">>}])), [{<<"receipt">>, <<"12345">>}])),
{ok, Data3} = gen_tcp:recv(Sock, 0), {ok, Data3} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"RECEIPT">>, {ok, #stomp_frame{command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}], headers = [{<<"receipt-id">>, <<"12345">>}],
body = _}, _, _} = parse(Data3) body = _}, _, _} = parse(Data3)
end). end).
t_receipt_in_error(_) -> t_receipt_in_error(_) ->
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])), {<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>, {ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
gen_tcp:send(Sock, serialize(<<"ABORT">>, gen_tcp:send(Sock, serialize(<<"ABORT">>,
[{<<"transaction">>, <<"tx1">>}, [{<<"transaction">>, <<"tx1">>},
{<<"receipt">>, <<"12345">>}])), {<<"receipt">>, <<"12345">>}])),
{ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Data1} = gen_tcp:recv(Sock, 0),
{ok, Frame = #stomp_frame{command = <<"ERROR">>, {ok, Frame = #stomp_frame{command = <<"ERROR">>,
headers = _, headers = _,
body = <<"Transaction tx1 not found">>}, _, _} = parse(Data1), body = <<"Transaction tx1 not found">>}, _, _} = parse(Data1),
<<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers) <<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers)
end). end).
t_ack(_) -> t_ack(_) ->
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER}, [{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>}, {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])), {<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>, {ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _, headers = _,
body = _}, _, _} = parse(Data), body = _}, _, _} = parse(Data),
%% Subscribe %% Subscribe
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
[{<<"id">>, 0}, [{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>}, {<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"client">>}])), {<<"ack">>, <<"client">>}])),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}], [{<<"destination">>, <<"/queue/foo">>}],
<<"ack test">>)), <<"ack test">>)),
{ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Data1} = gen_tcp:recv(Sock, 0),
{ok, Frame = #stomp_frame{command = <<"MESSAGE">>, {ok, Frame = #stomp_frame{command = <<"MESSAGE">>,
headers = _, headers = _,
body = <<"ack test">>}, _, _} = parse(Data1), body = <<"ack test">>}, _, _} = parse(Data1),
AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers), AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers),
gen_tcp:send(Sock, serialize(<<"ACK">>, gen_tcp:send(Sock, serialize(<<"ACK">>,
[{<<"id">>, AckId}, [{<<"id">>, AckId},
{<<"receipt">>, <<"12345">>}])), {<<"receipt">>, <<"12345">>}])),
{ok, Data2} = gen_tcp:recv(Sock, 0), {ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"RECEIPT">>, {ok, #stomp_frame{command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}], headers = [{<<"receipt-id">>, <<"12345">>}],
body = _}, _, _} = parse(Data2), body = _}, _, _} = parse(Data2),
gen_tcp:send(Sock, serialize(<<"SEND">>, gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, <<"/queue/foo">>}], [{<<"destination">>, <<"/queue/foo">>}],
<<"nack test">>)), <<"nack test">>)),
{ok, Data3} = gen_tcp:recv(Sock, 0), {ok, Data3} = gen_tcp:recv(Sock, 0),
{ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>, {ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>,
headers = _, headers = _,
body = <<"nack test">>}, _, _} = parse(Data3), body = <<"nack test">>}, _, _} = parse(Data3),
AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers), AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers),
gen_tcp:send(Sock, serialize(<<"NACK">>, gen_tcp:send(Sock, serialize(<<"NACK">>,
[{<<"id">>, AckId1}, [{<<"id">>, AckId1},
{<<"receipt">>, <<"12345">>}])), {<<"receipt">>, <<"12345">>}])),
{ok, Data4} = gen_tcp:recv(Sock, 0), {ok, Data4} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"RECEIPT">>, {ok, #stomp_frame{command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}], headers = [{<<"receipt-id">>, <<"12345">>}],
body = _}, _, _} = parse(Data4) body = _}, _, _} = parse(Data4)
end). end).
t_rest_clienit_info(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _,
body = _}, _, _} = parse(Data),
%% client lists
{200, Clients} = request(get, "/gateway/stomp/clients"),
?assertEqual(1, length(maps:get(data, Clients))),
StompClient = lists:nth(1, maps:get(data, Clients)),
ClientId = maps:get(clientid, StompClient),
ClientPath = "/gateway/stomp/clients/"
++ binary_to_list(ClientId),
{200, StompClient1} = request(get, ClientPath),
?assertEqual(StompClient, StompClient1),
assert_feilds_apperence(
[proto_name, awaiting_rel_max, inflight_cnt, disconnected_at,
send_msg, heap_size, connected, recv_cnt, send_pkt, mailbox_len,
username, recv_pkt, expiry_interval, clientid, mqueue_max,
send_oct, ip_address, is_bridge, awaiting_rel_cnt, mqueue_dropped,
mqueue_len, node, inflight_max, reductions, subscriptions_max,
connected_at, keepalive, created_at, clean_start,
subscriptions_cnt, recv_msg, send_cnt, proto_ver, recv_oct
], StompClient),
%% sub & unsub
{200, []} = request(get, ClientPath ++ "/subscriptions"),
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
[{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"client">>}])),
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs)),
assert_feilds_apperence([topic, qos], lists:nth(1, Subs)),
{204, _} = request(post, ClientPath ++ "/subscriptions",
#{topic => <<"t/a">>, qos => 1,
sub_props => #{subid => <<"1001">>}}),
{200, Subs1} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(2, length(Subs1)),
{204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"),
{200, Subs2} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs2)),
%% kickout
{204, _} = request(delete, ClientPath),
{200, Clients2} = request(get, "/gateway/stomp/clients"),
?assertEqual(0, length(maps:get(data, Clients2)))
end).
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride, %% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
%% Listeners, Metrics, Stats, ClientInfo %% Listeners, Metrics, Stats, ClientInfo