diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 87193c6a3..e8c13e7ac 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -93,25 +93,24 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) -> end); gateway_insta(get, #{bindings := #{name := Name0}}) -> try - GwName = try - binary_to_existing_atom(Name0) - catch _ : _ -> error(badname) - end, - case emqx_gateway:lookup(GwName) of - undefined -> - {200, #{name => GwName, status => unloaded}}; - Gateway -> - GwConf = emqx_gateway_conf:gateway(Name0), - GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339( - [created_at, started_at, stopped_at], - Gateway), - GwInfo1 = maps:with([name, - status, - created_at, - started_at, - stopped_at], GwInfo0), - {200, maps:merge(GwConf, GwInfo1)} - end + binary_to_existing_atom(Name0) + of + GwName -> + case emqx_gateway:lookup(GwName) of + undefined -> + {200, #{name => GwName, status => unloaded}}; + Gateway -> + GwConf = emqx_gateway_conf:gateway(Name0), + GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339( + [created_at, started_at, stopped_at], + Gateway), + GwInfo1 = maps:with([name, + status, + created_at, + started_at, + stopped_at], GwInfo0), + {200, maps:merge(GwConf, GwInfo1)} + end catch error : badname -> return_http_error(400, "Bad gateway name") diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index ac5ee17a5..2136a5b3d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -107,7 +107,7 @@ swagger("/gateway/:name/authentication", get) -> } }; swagger("/gateway/:name/authentication", put) -> - #{ description => <<"Create the gateway authentication">> + #{ description => <<"Update authentication for the gateway">> , parameters => params_gateway_name_in_path() , requestBody => schema_authn() , responses => diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 1eb44d3dd..71191c773 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -123,7 +123,7 @@ clients_insta(delete, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> _ = emqx_gateway_http:kickout_client(GwName, ClientId), - {200} + {204} end). %% FIXME: @@ -157,7 +157,7 @@ subscriptions(post, #{ bindings := #{name := Name0, {error, Reason} -> return_http_error(404, Reason); ok -> - {200} + {204} end end end); @@ -172,7 +172,7 @@ subscriptions(delete, #{ bindings := #{name := Name0, Topic = emqx_mgmt_util:urldecode(Topic0), with_gateway(Name0, fun(GwName, _) -> _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), - {200} + {204} end). %%-------------------------------------------------------------------- @@ -444,7 +444,7 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) -> #{ <<"400">> => schema_bad_request() , <<"404">> => schema_not_found() , <<"500">> => schema_internal_error() - , <<"200">> => schema_no_content() + , <<"204">> => schema_no_content() } }; swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index 10cf134fa..85125f466 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -21,6 +21,7 @@ -import(emqx_gateway_test_utils, [ assert_confs/2 + , assert_feilds_apperence/2 , request/2 , request/3 ]). @@ -271,10 +272,3 @@ assert_gw_unloaded(Gateway) -> assert_bad_request(BadReq) -> ?assertEqual(<<"BAD_REQUEST">>, maps:get(code, BadReq)). - -assert_feilds_apperence(Ks, Map) -> - lists:foreach(fun(K) -> - _ = maps:get(K, Map) - end, Ks). - - diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl index 3dafab38f..d7fd12c3d 100644 --- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -60,6 +60,11 @@ maybe_unconvert_listeners(Conf) when is_map(Conf) -> maybe_unconvert_listeners(Conf) -> Conf. +assert_feilds_apperence(Ks, Map) -> + lists:foreach(fun(K) -> + _ = maps:get(K, Map) + end, Ks). + %%-------------------------------------------------------------------- %% http diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 56b1174a2..83da59999 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -16,11 +16,18 @@ -module(emqx_stomp_SUITE). +-include_lib("eunit/include/eunit.hrl"). -include_lib("emqx_gateway/src/stomp/include/emqx_stomp.hrl"). -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ assert_feilds_apperence/2 + , request/2 + , request/3 + ]). + -define(HEARTBEAT, <<$\n>>). -define(CONF_DEFAULT, <<" @@ -43,11 +50,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Cfg) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_common_test_helpers:start_apps([emqx_gateway]), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), Cfg. end_per_suite(_Cfg) -> - emqx_common_test_helpers:stop_apps([emqx_gateway]), + emqx_mgmt_api_test_util:end_suite([emqx_gateway]), ok. %%-------------------------------------------------------------------- @@ -57,26 +64,26 @@ end_per_suite(_Cfg) -> t_connect(_) -> %% Connect should be succeed with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,2000">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, Frame = #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), - <<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"1000,2000">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, Frame = #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), + <<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers), - gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, - [{<<"receipt">>, <<"12345">>}])), + gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, + [{<<"receipt">>, <<"12345">>}])), - {ok, Data1} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _}, _, _} = parse(Data1) - end), + {ok, Data1} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"RECEIPT">>, + headers = [{<<"receipt-id">>, <<"12345">>}], + body = _}, _, _} = parse(Data1) + end), %% Connect will be failed, because of bad login or passcode %% FIXME: Waiting for authentication works @@ -95,249 +102,309 @@ t_connect(_) -> %% Connect will be failed, because of bad version with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, <<"2.0,2.1">>}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,2000">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"ERROR">>, - headers = _, - body = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data) - end). + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, <<"2.0,2.1">>}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"1000,2000">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"ERROR">>, + headers = _, + body = <<"Login Failed: Supported protocol versions < 1.2">>}, _, _} = parse(Data) + end). t_heartbeat(_) -> %% Test heart beat with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,800">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"1000,800">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), - {ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0), - %% Server will close the connection because never receive the heart beat from client - {error, closed} = gen_tcp:recv(Sock, 0) - end). + {ok, ?HEARTBEAT} = gen_tcp:recv(Sock, 0), + %% Server will close the connection because never receive the heart beat from client + {error, closed} = gen_tcp:recv(Sock, 0) + end). t_subscribe(_) -> with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"0,0">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), - %% Subscribe - gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, - [{<<"id">>, 0}, - {<<"destination">>, <<"/queue/foo">>}, - {<<"ack">>, <<"auto">>}])), + %% Subscribe + gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, + [{<<"id">>, 0}, + {<<"destination">>, <<"/queue/foo">>}, + {<<"ack">>, <<"auto">>}])), - %% 'user-defined' header will be retain - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}, - {<<"user-defined">>, <<"emq">>}], - <<"hello">>)), + %% 'user-defined' header will be retain + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}, + {<<"user-defined">>, <<"emq">>}], + <<"hello">>)), - {ok, Data1} = gen_tcp:recv(Sock, 0, 1000), - {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, - headers = _, - body = <<"hello">>}, _, _} = parse(Data1), - lists:foreach(fun({Key, Val}) -> - Val = proplists:get_value(Key, Frame#stomp_frame.headers) - end, [{<<"destination">>, <<"/queue/foo">>}, - {<<"subscription">>, <<"0">>}, - {<<"user-defined">>, <<"emq">>}]), + {ok, Data1} = gen_tcp:recv(Sock, 0, 1000), + {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, + headers = _, + body = <<"hello">>}, _, _} = parse(Data1), + lists:foreach(fun({Key, Val}) -> + Val = proplists:get_value(Key, Frame#stomp_frame.headers) + end, [{<<"destination">>, <<"/queue/foo">>}, + {<<"subscription">>, <<"0">>}, + {<<"user-defined">>, <<"emq">>}]), - %% Unsubscribe - gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, - [{<<"id">>, 0}, - {<<"receipt">>, <<"12345">>}])), + %% Unsubscribe + gen_tcp:send(Sock, serialize(<<"UNSUBSCRIBE">>, + [{<<"id">>, 0}, + {<<"receipt">>, <<"12345">>}])), - {ok, Data2} = gen_tcp:recv(Sock, 0, 1000), + {ok, Data2} = gen_tcp:recv(Sock, 0, 1000), - {ok, #stomp_frame{command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _}, _, _} = parse(Data2), + {ok, #stomp_frame{command = <<"RECEIPT">>, + headers = [{<<"receipt-id">>, <<"12345">>}], + body = _}, _, _} = parse(Data2), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}], - <<"You will not receive this msg">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}], + <<"You will not receive this msg">>)), - {error, timeout} = gen_tcp:recv(Sock, 0, 500) - end). + {error, timeout} = gen_tcp:recv(Sock, 0, 500) + end). t_transaction(_) -> with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"0,0">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), - %% Subscribe - gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, - [{<<"id">>, 0}, - {<<"destination">>, <<"/queue/foo">>}, - {<<"ack">>, <<"auto">>}])), + %% Subscribe + gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, + [{<<"id">>, 0}, + {<<"destination">>, <<"/queue/foo">>}, + {<<"ack">>, <<"auto">>}])), - %% Transaction: tx1 - gen_tcp:send(Sock, serialize(<<"BEGIN">>, - [{<<"transaction">>, <<"tx1">>}])), + %% Transaction: tx1 + gen_tcp:send(Sock, serialize(<<"BEGIN">>, + [{<<"transaction">>, <<"tx1">>}])), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}, - {<<"transaction">>, <<"tx1">>}], - <<"hello">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}, + {<<"transaction">>, <<"tx1">>}], + <<"hello">>)), - %% You will not receive any messages - {error, timeout} = gen_tcp:recv(Sock, 0, 1000), + %% You will not receive any messages + {error, timeout} = gen_tcp:recv(Sock, 0, 1000), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}, - {<<"transaction">>, <<"tx1">>}], - <<"hello again">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}, + {<<"transaction">>, <<"tx1">>}], + <<"hello again">>)), - gen_tcp:send(Sock, serialize(<<"COMMIT">>, - [{<<"transaction">>, <<"tx1">>}])), + gen_tcp:send(Sock, serialize(<<"COMMIT">>, + [{<<"transaction">>, <<"tx1">>}])), - ct:sleep(1000), - {ok, Data1} = gen_tcp:recv(Sock, 0, 500), + ct:sleep(1000), + {ok, Data1} = gen_tcp:recv(Sock, 0, 500), - {ok, #stomp_frame{command = <<"MESSAGE">>, - headers = _, - body = <<"hello">>}, Rest1, _} = parse(Data1), + {ok, #stomp_frame{command = <<"MESSAGE">>, + headers = _, + body = <<"hello">>}, Rest1, _} = parse(Data1), - %{ok, Data2} = gen_tcp:recv(Sock, 0, 500), - {ok, #stomp_frame{command = <<"MESSAGE">>, - headers = _, - body = <<"hello again">>}, _Rest2, _} = parse(Rest1), + %{ok, Data2} = gen_tcp:recv(Sock, 0, 500), + {ok, #stomp_frame{command = <<"MESSAGE">>, + headers = _, + body = <<"hello again">>}, _Rest2, _} = parse(Rest1), - %% Transaction: tx2 - gen_tcp:send(Sock, serialize(<<"BEGIN">>, - [{<<"transaction">>, <<"tx2">>}])), + %% Transaction: tx2 + gen_tcp:send(Sock, serialize(<<"BEGIN">>, + [{<<"transaction">>, <<"tx2">>}])), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}, - {<<"transaction">>, <<"tx2">>}], - <<"hello">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}, + {<<"transaction">>, <<"tx2">>}], + <<"hello">>)), - gen_tcp:send(Sock, serialize(<<"ABORT">>, - [{<<"transaction">>, <<"tx2">>}])), + gen_tcp:send(Sock, serialize(<<"ABORT">>, + [{<<"transaction">>, <<"tx2">>}])), - %% You will not receive any messages - {error, timeout} = gen_tcp:recv(Sock, 0, 1000), + %% You will not receive any messages + {error, timeout} = gen_tcp:recv(Sock, 0, 1000), - gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, - [{<<"receipt">>, <<"12345">>}])), + gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, + [{<<"receipt">>, <<"12345">>}])), - {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _}, _, _} = parse(Data3) - end). + {ok, Data3} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"RECEIPT">>, + headers = [{<<"receipt-id">>, <<"12345">>}], + body = _}, _, _} = parse(Data3) + end). t_receipt_in_error(_) -> with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"0,0">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), - gen_tcp:send(Sock, serialize(<<"ABORT">>, - [{<<"transaction">>, <<"tx1">>}, - {<<"receipt">>, <<"12345">>}])), + gen_tcp:send(Sock, serialize(<<"ABORT">>, + [{<<"transaction">>, <<"tx1">>}, + {<<"receipt">>, <<"12345">>}])), - {ok, Data1} = gen_tcp:recv(Sock, 0), - {ok, Frame = #stomp_frame{command = <<"ERROR">>, - headers = _, - body = <<"Transaction tx1 not found">>}, _, _} = parse(Data1), + {ok, Data1} = gen_tcp:recv(Sock, 0), + {ok, Frame = #stomp_frame{command = <<"ERROR">>, + headers = _, + body = <<"Transaction tx1 not found">>}, _, _} = parse(Data1), - <<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers) - end). + <<"12345">> = proplists:get_value(<<"receipt-id">>, Frame#stomp_frame.headers) + end). t_ack(_) -> with_connection(fun(Sock) -> - gen_tcp:send(Sock, serialize(<<"CONNECT">>, - [{<<"accept-version">>, ?STOMP_VER}, - {<<"host">>, <<"127.0.0.1:61613">>}, - {<<"login">>, <<"guest">>}, - {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"0,0">>}])), - {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"CONNECTED">>, - headers = _, - body = _}, _, _} = parse(Data), + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), - %% Subscribe - gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, - [{<<"id">>, 0}, - {<<"destination">>, <<"/queue/foo">>}, - {<<"ack">>, <<"client">>}])), + %% Subscribe + gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, + [{<<"id">>, 0}, + {<<"destination">>, <<"/queue/foo">>}, + {<<"ack">>, <<"client">>}])), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}], - <<"ack test">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}], + <<"ack test">>)), - {ok, Data1} = gen_tcp:recv(Sock, 0), - {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, - headers = _, - body = <<"ack test">>}, _, _} = parse(Data1), + {ok, Data1} = gen_tcp:recv(Sock, 0), + {ok, Frame = #stomp_frame{command = <<"MESSAGE">>, + headers = _, + body = <<"ack test">>}, _, _} = parse(Data1), - AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers), + AckId = proplists:get_value(<<"ack">>, Frame#stomp_frame.headers), - gen_tcp:send(Sock, serialize(<<"ACK">>, - [{<<"id">>, AckId}, - {<<"receipt">>, <<"12345">>}])), + gen_tcp:send(Sock, serialize(<<"ACK">>, + [{<<"id">>, AckId}, + {<<"receipt">>, <<"12345">>}])), - {ok, Data2} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _}, _, _} = parse(Data2), + {ok, Data2} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"RECEIPT">>, + headers = [{<<"receipt-id">>, <<"12345">>}], + body = _}, _, _} = parse(Data2), - gen_tcp:send(Sock, serialize(<<"SEND">>, - [{<<"destination">>, <<"/queue/foo">>}], - <<"nack test">>)), + gen_tcp:send(Sock, serialize(<<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}], + <<"nack test">>)), - {ok, Data3} = gen_tcp:recv(Sock, 0), - {ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>, - headers = _, - body = <<"nack test">>}, _, _} = parse(Data3), + {ok, Data3} = gen_tcp:recv(Sock, 0), + {ok, Frame1 = #stomp_frame{command = <<"MESSAGE">>, + headers = _, + body = <<"nack test">>}, _, _} = parse(Data3), - AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers), + AckId1 = proplists:get_value(<<"ack">>, Frame1#stomp_frame.headers), - gen_tcp:send(Sock, serialize(<<"NACK">>, - [{<<"id">>, AckId1}, - {<<"receipt">>, <<"12345">>}])), + gen_tcp:send(Sock, serialize(<<"NACK">>, + [{<<"id">>, AckId1}, + {<<"receipt">>, <<"12345">>}])), - {ok, Data4} = gen_tcp:recv(Sock, 0), - {ok, #stomp_frame{command = <<"RECEIPT">>, - headers = [{<<"receipt-id">>, <<"12345">>}], - body = _}, _, _} = parse(Data4) - end). + {ok, Data4} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"RECEIPT">>, + headers = [{<<"receipt-id">>, <<"12345">>}], + body = _}, _, _} = parse(Data4) + end). + +t_rest_clienit_info(_) -> + with_connection(fun(Sock) -> + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), + + %% client lists + {200, Clients} = request(get, "/gateway/stomp/clients"), + ?assertEqual(1, length(maps:get(data, Clients))), + StompClient = lists:nth(1, maps:get(data, Clients)), + ClientId = maps:get(clientid, StompClient), + ClientPath = "/gateway/stomp/clients/" + ++ binary_to_list(ClientId), + {200, StompClient1} = request(get, ClientPath), + ?assertEqual(StompClient, StompClient1), + assert_feilds_apperence( + [proto_name, awaiting_rel_max, inflight_cnt, disconnected_at, + send_msg, heap_size, connected, recv_cnt, send_pkt, mailbox_len, + username, recv_pkt, expiry_interval, clientid, mqueue_max, + send_oct, ip_address, is_bridge, awaiting_rel_cnt, mqueue_dropped, + mqueue_len, node, inflight_max, reductions, subscriptions_max, + connected_at, keepalive, created_at, clean_start, + subscriptions_cnt, recv_msg, send_cnt, proto_ver, recv_oct + ], StompClient), + + %% sub & unsub + {200, []} = request(get, ClientPath ++ "/subscriptions"), + gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, + [{<<"id">>, 0}, + {<<"destination">>, <<"/queue/foo">>}, + {<<"ack">>, <<"client">>}])), + + {200, Subs} = request(get, ClientPath ++ "/subscriptions"), + ?assertEqual(1, length(Subs)), + assert_feilds_apperence([topic, qos], lists:nth(1, Subs)), + + {204, _} = request(post, ClientPath ++ "/subscriptions", + #{topic => <<"t/a">>, qos => 1, + sub_props => #{subid => <<"1001">>}}), + + {200, Subs1} = request(get, ClientPath ++ "/subscriptions"), + ?assertEqual(2, length(Subs1)), + + {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"), + {200, Subs2} = request(get, ClientPath ++ "/subscriptions"), + ?assertEqual(1, length(Subs2)), + + %% kickout + {204, _} = request(delete, ClientPath), + {200, Clients2} = request(get, "/gateway/stomp/clients"), + ?assertEqual(0, length(maps:get(data, Clients2))) + end). %% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride, %% Listeners, Metrics, Stats, ClientInfo