diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index fcc240ca5..b4f9e30f0 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -247,6 +247,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, %% modifty session state SubReq = {Topic, Token}, TempMsg = #coap_message{type = non}, + %% FIXME: The subopts is not used for emqx_coap_session Result = emqx_coap_session:process_subscribe( SubReq, TempMsg, #{}, Session), NSession = maps:get(session, Result), diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 52921c16a..108b4c55b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -92,7 +92,9 @@ info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{observe_manager = OM}) -> Topics = emqx_coap_observe_res:subscriptions(OM), - lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics); + lists:foldl( + fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end, + #{}, Topics); info(subscriptions_cnt, #session{observe_manager = OM}) -> erlang:length(emqx_coap_observe_res:subscriptions(OM)); info(subscriptions_max, _) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 5be4adccf..86dbb69af 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -73,7 +73,7 @@ paths() -> , {<<"ip_address">>, ip} , {<<"conn_state">>, atom} , {<<"clean_start">>, atom} - , {<<"proto_ver">>, integer} + , {<<"proto_ver">>, binary} , {<<"like_clientid">>, binary} , {<<"like_username">>, binary} , {<<"gte_created_at">>, timestamp} @@ -83,15 +83,16 @@ paths() -> %% special keys for lwm2m protocol , {<<"endpoint_name">>, binary} , {<<"like_endpoint_name">>, binary} - , {<<"gte_lifetime">>, timestamp} - , {<<"lte_lifetime">>, timestamp} + , {<<"gte_lifetime">>, integer} + , {<<"lte_lifetime">>, integer} ]). -define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} - , query_string := Params + , query_string := Params0 }) -> + Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()), with_gateway(Name0, fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), case maps:get(<<"node">>, Params, undefined) of @@ -147,10 +148,6 @@ subscriptions(get, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of - {error, nosupport} -> - return_http_error(405, <<"Not support to list subscriptions">>); - {error, noimpl} -> - return_http_error(501, <<"Not implemented now">>); {error, Reason} -> return_http_error(500, Reason); {ok, Subs} -> @@ -171,14 +168,6 @@ subscriptions(post, #{ bindings := #{name := Name0, {Topic, SubOpts} -> case emqx_gateway_http:client_subscribe( GwName, ClientId, Topic, SubOpts) of - {error, nosupport} -> - return_http_error( - 405, - <<"Not support to add a subscription">>); - {error, noimpl} -> - return_http_error( - 501, - <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); {ok, {NTopic, NSubOpts}}-> @@ -221,6 +210,16 @@ extra_sub_props(Props) -> #{subid => maps:get(<<"subid">>, Props, undefined)} ). +%%-------------------------------------------------------------------- +%% QueryString data-fomrat convert +%% (try rfc3339 to timestamp or keep timestamp) + +time_keys() -> + [ <<"gte_created_at">> + , <<"lte_created_at">> + , <<"gte_connected_at">> + , <<"lte_connected_at">>]. + %%-------------------------------------------------------------------- %% query funcs @@ -264,10 +263,8 @@ ms(clientid, X) -> #{clientinfo => #{clientid => X}}; ms(username, X) -> #{clientinfo => #{username => X}}; -ms(zone, X) -> - #{clientinfo => #{zone => X}}; ms(ip_address, X) -> - #{clientinfo => #{peername => {X, '_'}}}; + #{clientinfo => #{peerhost => X}}; ms(conn_state, X) -> #{conn_state => X}; ms(clean_start, X) -> @@ -616,9 +613,6 @@ roots() -> , subscription ]. -fields(test) -> - [{key, mk(binary(), #{ desc => <<"Desc">>})}]; - fields(stomp_client) -> common_client_props(); fields(mqttsn_client) -> @@ -707,10 +701,6 @@ common_client_props() -> %, {will_msg, % mk(binary(), % #{ desc => <<"Client will message">>})} - %, {zone, - % mk(binary(), - % #{ desc => <<"Indicate the configuration group used by the " - % "client">>})} , {keepalive, mk(integer(), #{ desc => <<"keepalive time, with the unit of second">>})} diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 41b795b7b..843cf39d5 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -451,7 +451,7 @@ do_subscribe(TopicFilter, SubOpts, Channel = subscriptions = Subs}) -> %% Mountpoint first NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), SubId = maps:get(clientid, ClientInfo, undefined), %% XXX: is_new? IsNew = not maps:is_key(NTopicFilter, Subs), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 58cc24f9d..aee4189a5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -931,7 +931,7 @@ do_subscribe({TopicId, TopicName, SubOpts}, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of {ok, NSession} -> {ok, {TopicId, NTopicName, NSubOpts}, diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl index 8b336252b..e35c6e2db 100644 --- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -include_lib("er_coap_client/include/coap.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -48,114 +53,115 @@ gateway.coap all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), Config. -set_special_cfg(emqx_gateway) -> - ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT); - -set_special_cfg(_) -> - ok. - -end_per_suite(Config) -> +end_per_suite(_) -> {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]), - emqx_common_test_helpers:stop_apps([emqx_gateway]), - Config. + emqx_mgmt_api_test_util:end_suite([emqx_gateway]). %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- -t_connection(_Config) -> + +t_connection(_) -> Action = fun(Channel) -> - %% connection - Token = connection(Channel), + %% connection + Token = connection(Channel), - timer:sleep(100), - ?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)), + timer:sleep(100), + ?assertNotEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)), - %% heartbeat - HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token, - ?LOGT("send heartbeat request:~ts~n", [HeartURI]), - {ok, changed, _} = er_coap_client:request(put, HeartURI), + %% heartbeat + HeartURI = ?MQTT_PREFIX ++ + "/connection?clientid=client1&token=" ++ + Token, - disconnection(Channel, Token), + ?LOGT("send heartbeat request:~ts~n", [HeartURI]), + {ok, changed, _} = er_coap_client:request(put, HeartURI), - timer:sleep(100), - ?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)) - end, + disconnection(Channel, Token), + + timer:sleep(100), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)) + end, do(Action). - -t_publish(_Config) -> +t_publish(_) -> Action = fun(Channel, Token) -> - Topic = <<"/abc">>, - Payload = <<"123">>, + Topic = <<"/abc">>, + Payload = <<"123">>, - TopicStr = binary_to_list(Topic), - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + TopicStr = binary_to_list(Topic), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - %% Sub topic first - emqx:subscribe(Topic), + %% Sub topic first + emqx:subscribe(Topic), - Req = make_req(post, Payload), - {ok, changed, _} = do_request(Channel, URI, Req), - - receive - {deliver, Topic, Msg} -> - ?assertEqual(Topic, Msg#message.topic), - ?assertEqual(Payload, Msg#message.payload) - after - 500 -> - ?assert(false) - end - end, + Req = make_req(post, Payload), + {ok, changed, _} = do_request(Channel, URI, Req), + receive + {deliver, Topic, Msg} -> + ?assertEqual(Topic, Msg#message.topic), + ?assertEqual(Payload, Msg#message.payload) + after + 500 -> + ?assert(false) + end + end, with_connection(Action). - -%t_publish_authz_deny(_Config) -> +%t_publish_authz_deny(_) -> % Action = fun(Channel, Token) -> -% Topic = <<"/abc">>, -% Payload = <<"123">>, -% InvalidToken = lists:reverse(Token), +% Topic = <<"/abc">>, +% Payload = <<"123">>, +% InvalidToken = lists:reverse(Token), % -% TopicStr = binary_to_list(Topic), -% URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken, +% TopicStr = binary_to_list(Topic), +% URI = ?PS_PREFIX ++ +% TopicStr ++ +% "?clientid=client1&token=" ++ InvalidToken, % -% %% Sub topic first -% emqx:subscribe(Topic), +% %% Sub topic first +% emqx:subscribe(Topic), % -% Req = make_req(post, Payload), -% Result = do_request(Channel, URI, Req), -% ?assertEqual({error, reset}, Result) -% end, +% Req = make_req(post, Payload), +% Result = do_request(Channel, URI, Req), +% ?assertEqual({error, reset}, Result) +% end, % % with_connection(Action). -t_subscribe(_Config) -> +t_subscribe(_) -> Topic = <<"/abc">>, Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - %% Publish a message - emqx:publish(emqx_message:make(Topic, Payload)), - {ok, content, Notify} = with_response(Channel), - ?LOGT("observer get Notif=~p", [Notify]), + %% Publish a message + emqx:publish(emqx_message:make(Topic, Payload)), + {ok, content, Notify} = with_response(Channel), + ?LOGT("observer get Notif=~p", [Notify]), - #coap_content{payload = PayloadRecv} = Notify, + #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) - end, + ?assertEqual(Payload, PayloadRecv) + end, with_connection(Fun), timer:sleep(100), @@ -163,63 +169,117 @@ t_subscribe(_Config) -> ?assertEqual([], emqx:subscribers(Topic)). -t_un_subscribe(_Config) -> +t_un_subscribe(_) -> Topic = <<"/abc">>, Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - UnReq = make_req(get, Payload, [{observe, 1}]), - {ok, nocontent, _} = do_request(Channel, URI, UnReq), - ?LOGT("un observer topic:~ts~n", [Topic]), - timer:sleep(100), - ?assertEqual([], emqx:subscribers(Topic)) - end, + UnReq = make_req(get, Payload, [{observe, 1}]), + {ok, nocontent, _} = do_request(Channel, URI, UnReq), + ?LOGT("un observer topic:~ts~n", [Topic]), + timer:sleep(100), + ?assertEqual([], emqx:subscribers(Topic)) + end, with_connection(Fun). -t_observe_wildcard(_Config) -> +t_observe_wildcard(_) -> Fun = fun(Channel, Token) -> - %% resolve_url can't process wildcard with # - Topic = <<"/abc/+">>, - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + %% resolve_url can't process wildcard with # + Topic = <<"/abc/+">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - %% Publish a message - PubTopic = <<"/abc/def">>, - emqx:publish(emqx_message:make(PubTopic, Payload)), - {ok, content, Notify} = with_response(Channel), + %% Publish a message + PubTopic = <<"/abc/def">>, + emqx:publish(emqx_message:make(PubTopic, Payload)), + {ok, content, Notify} = with_response(Channel), - ?LOGT("observer get Notif=~p", [Notify]), + ?LOGT("observer get Notif=~p", [Notify]), - #coap_content{payload = PayloadRecv} = Notify, + #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) - end, + ?assertEqual(Payload, PayloadRecv) + end, with_connection(Fun). +t_clients_api(_) -> + Fun = fun(_Channel, _Token) -> + ClientId = <<"client1">>, + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"), + #{clientid := ClientId} = Client1, + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/coap/clients", + [{<<"clientid">>, ClientId}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/coap/clients", + [{<<"like_clientid">>, <<"cli">>}]), + %% lookup + {200, Client4} = + request(get, "/gateway/coap/clients/client1"), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/coap/clients/client1"), + {200, #{data := []}} = request(get, "/gateway/coap/clients") + end, + with_connection(Fun). + +t_clients_subscription_api(_) -> + Fun = fun(_Channel, _Token) -> + Path = "/gateway/coap/clients/client1/subscriptions", + %% list + {200, []} = request(get, Path), + %% create + SubReq = #{ topic => <<"tx">> + , qos => 0 + , nl => 0 + , rap => 0 + , rh => 0 + }, + + {201, SubsResp} = request(post, Path, SubReq), + {200, [SubsResp2]} = request(get, Path), + ?assertEqual( + maps:get(topic, SubsResp), + maps:get(topic, SubsResp2)), + + {204, _} = request(delete, Path ++ "/tx"), + + {200, []} = request(get, Path) + end, + with_connection(Fun). + +%%-------------------------------------------------------------------- +%% helpers + connection(Channel) -> - URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public", + URI = ?MQTT_PREFIX ++ + "/connection?clientid=client1&username=admin&password=public", Req = make_req(post), {ok, created, Data} = do_request(Channel, URI, Req), #coap_content{payload = BinToken} = Data, @@ -252,7 +312,8 @@ do_request(Channel, URI, #coap_message{options = Opts} = Req) -> with_response(Channel) -> receive - {coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} -> + {coap_response, _ChId, Channel, + _Ref, Message=#coap_message{method=Code}} -> return_response(Code, Message); {coap_error, _ChId, Channel, _Ref, reset} -> {error, reset} @@ -280,10 +341,10 @@ do(Fun) -> with_connection(Action) -> Fun = fun(Channel) -> - Token = connection(Channel), - timer:sleep(100), - Action(Channel, Token), - disconnection(Channel, Token), - timer:sleep(100) - end, + Token = connection(Channel), + timer:sleep(100), + Action(Channel, Token), + disconnection(Channel, Token), + timer:sleep(100) + end, do(Fun). diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl index d2daf48b4..c96dee651 100644 --- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -117,6 +117,8 @@ req(Path, Qs) -> req(Path, Qs, Body) -> {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}. +url(Path, []) -> + lists:concat([?http_api_host, Path]); url(Path, Qs) -> lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]). diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index b6c1b837f..30e910f2b 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -define(PORT, 5783). -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). @@ -66,9 +71,9 @@ all() -> , {group, test_grp_4_discover} , {group, test_grp_5_write_attr} , {group, test_grp_6_observe} - %% {group, test_grp_8_object_19} , {group, test_grp_9_psm_queue_mode} + , {group, test_grp_10_rest_api} ]. suite() -> [{timetrap, {seconds, 90}}]. @@ -147,21 +152,29 @@ groups() -> [ case90_psm_mode, case90_queue_mode + ]}, + {test_grp_10_rest_api, [RepeatOpt], + [ + case100_clients_api, + case100_subscription_api ]} ]. init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf]), + %% load application first for minirest api searching + application:load(emqx_gateway), + emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. end_per_suite(Config) -> timer:sleep(300), {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}), - emqx_common_test_helpers:stop_apps([emqx_conf]), + emqx_mgmt_api_test_util:end_suite([emqx_conf]), Config. init_per_testcase(_AllTestCase, Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + {ok, _} = application:ensure_all_started(emqx_gateway), {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]), @@ -1887,6 +1900,68 @@ server_cache_mode(Config, RegOption) -> verify_read_response_1(2, UdpSock), verify_read_response_1(3, UdpSock). +case100_clients_api(Config) -> + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + UdpSock = ?config(sock, Config), + ObjectList = <<", , , , ">>, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic), + + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"), + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/lwm2m/clients", + [{<<"endpoint_name">>, list_to_binary(Epn)}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/lwm2m/clients", + [{<<"like_endpoint_name">>, list_to_binary(Epn)}, + {<<"gte_lifetime">>, <<"1">>} + ]), + %% lookup + ClientId = maps:get(clientid, Client1), + {200, Client4} = + request(get, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)), + {200, #{data := []}} = request(get, "/gateway/lwm2m/clients"). + +case100_subscription_api(Config) -> + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + UdpSock = ?config(sock, Config), + ObjectList = <<", , , , ">>, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic), + + {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"), + ClientId = maps:get(clientid, Client1), + Path = "/gateway/lwm2m/clients/" ++ + binary_to_list(ClientId) ++ + "/subscriptions", + + %% list + {200, [InitSub]} = request(get, Path), + ?assertEqual( + <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/#">>, + maps:get(topic, InitSub)), + + %% create + SubReq = #{ topic => <<"tx">> + , qos => 1 + , nl => 0 + , rap => 0 + , rh => 0 + }, + {201, _} = request(post, Path, SubReq), + {200, _} = request(get, Path), + {204, _} = request(delete, Path ++ "/tx"), + {200, [InitSub]} = request(get, Path). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Internal Functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index fff7fb8e2..d3cae8596 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -include("src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -27,7 +32,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). - -define(HOST, {127,0,0,1}). -define(PORT, 1884). @@ -85,12 +89,12 @@ all() -> init_per_suite(Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_common_test_helpers:start_apps([emqx_gateway]), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), Config. end_per_suite(_) -> {ok, _} = emqx:remove_config([gateway, mqttsn]), - emqx_common_test_helpers:stop_apps([emqx_gateway]). + emqx_mgmt_api_test_util:end_suite([emqx_gateway]). %%-------------------------------------------------------------------- %% Test cases @@ -1762,6 +1766,68 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_clients_api(_) -> + TsNow = emqx_gateway_utils:unix_ts_to_rfc3339( + erlang:system_time(millisecond)), + ClientId = <<"client_id_test1">>, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/mqttsn/clients"), + #{clientid := ClientId} = Client1, + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/mqttsn/clients", [{<<"clientid">>, ClientId}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/mqttsn/clients", + [{<<"like_clientid">>, <<"test1">>}, + {<<"proto_ver">>, <<"1.2">>}, + {<<"ip_address">>, <<"127.0.0.1">>}, + {<<"conn_state">>, <<"connected">>}, + {<<"clean_start">>, <<"true">>}, + {<<"gte_connected_at">>, TsNow} + ]), + %% lookup + {200, Client4} = + request(get, "/gateway/mqttsn/clients/client_id_test1"), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/mqttsn/clients/client_id_test1"), + {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), + + send_disconnect_msg(Socket, undefined), + gen_udp:close(Socket). + +t_clients_subscription_api(_) -> + ClientId = <<"client_id_test1">>, + Path = "/gateway/mqttsn/clients/client_id_test1/subscriptions", + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + %% list + {200, []} = request(get, Path), + %% create + SubReq = #{ topic => <<"tx">> + , qos => 1 + , nl => 0 + , rap => 0 + , rh => 0 + }, + {201, SubsResp} = request(post, Path, SubReq), + + {200, [SubsResp]} = request(get, Path), + + {204, _} = request(delete, Path ++ "/tx"), + + {200, []} = request(get, Path), + + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 2ff730a6a..6136cab3a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -35,6 +35,15 @@ -export([do_query/6]). +-export([ ensure_timestamp_format/2 + ]). + +-export([ unix_ts_to_rfc3339_bin/1 + , unix_ts_to_rfc3339_bin/2 + , time_string_to_unix_ts_int/1 + , time_string_to_unix_ts_int/2 + ]). + paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), Count = count(Tables), @@ -401,6 +410,7 @@ to_integer(B) when is_binary(B) -> to_timestamp(I) when is_integer(I) -> I; to_timestamp(B) when is_binary(B) -> + binary_to_integer(B). aton(B) when is_binary(B) -> @@ -412,6 +422,41 @@ to_ip_port(IPAddress) -> Port = list_to_integer(Port0), {IP, Port}. +%%-------------------------------------------------------------------- +%% time format funcs + +ensure_timestamp_format(Qs, TimeKeys) + when is_map(Qs); + is_list(TimeKeys) -> + Fun = fun (Key, NQs) -> + case NQs of + %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339) + %% or "1609430400000" (in millisecond) + #{Key := TimeString} -> + NQs#{Key => time_string_to_unix_ts_int(TimeString)}; + #{} -> NQs + end + end, + lists:foldl(Fun, Qs, TimeKeys). + +unix_ts_to_rfc3339_bin(TimeStamp) -> + unix_ts_to_rfc3339_bin(TimeStamp, millisecond). + +unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) -> + list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])). + +time_string_to_unix_ts_int(DateTime) -> + time_string_to_unix_ts_int(DateTime, millisecond). + +time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) -> + try binary_to_integer(DateTime) of + TimeStamp when is_integer(TimeStamp) -> TimeStamp + catch + error:badarg -> + calendar:rfc3339_to_system_time( + binary_to_list(DateTime), [{unit, Unit}]) + end. + %%-------------------------------------------------------------------- %% EUnits %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 007916d1b..be38e401d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -44,14 +44,6 @@ %% for batch operation -export([do_subscribe/3]). -%% for test suite --export([ unix_ts_to_rfc3339_bin/1 - , unix_ts_to_rfc3339_bin/2 - , time_string_to_unix_ts_int/1 - , time_string_to_unix_ts_int/2 - ]). - - -define(CLIENT_QS_SCHEMA, {emqx_channel_info, [ {<<"node">>, atom} , {<<"username">>, binary} @@ -463,7 +455,7 @@ keepalive_api() -> %%%============================================================================================== %% parameters trans clients(get, #{query_string := Qs}) -> - list(generate_qs(Qs)). + list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())). client(get, #{bindings := Bindings}) -> lookup(Bindings); @@ -625,7 +617,8 @@ do_unsubscribe(ClientID, Topic) -> end. %%-------------------------------------------------------------------- -%% QueryString Generation (try rfc3339 to timestamp or keep timestamp) +%% QueryString data-fomrat convert +%% (try rfc3339 to timestamp or keep timestamp) time_keys() -> [ <<"gte_created_at">> @@ -633,18 +626,6 @@ time_keys() -> , <<"gte_connected_at">> , <<"lte_connected_at">>]. -generate_qs(Qs) -> - Fun = - fun (Key, NQs) -> - case NQs of - %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339) - %% or "1609430400000" (in millisecond) - #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)}; - #{} -> NQs - end - end, - lists:foldl(Fun, Qs, time_keys()). - %%-------------------------------------------------------------------- %% Query Functions @@ -778,8 +759,11 @@ take_maps_from_inner(Key, Value, Current) -> result_format_time_fun(Key, NClientInfoMap) -> case NClientInfoMap of - #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)}; - #{} -> NClientInfoMap + #{Key := TimeStamp} -> + NClientInfoMap#{ + Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)}; + #{} -> + NClientInfoMap end. -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}). @@ -795,22 +779,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> updated_time => Timestamp }. -%%-------------------------------------------------------------------- -%% time format funcs - -unix_ts_to_rfc3339_bin(TimeStamp) -> - unix_ts_to_rfc3339_bin(TimeStamp, millisecond). - -unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) -> - list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])). - -time_string_to_unix_ts_int(DateTime) -> - time_string_to_unix_ts_int(DateTime, millisecond). - -time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) -> - try binary_to_integer(DateTime) of - TimeStamp when is_integer(TimeStamp) -> TimeStamp - catch - error:badarg -> - calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}]) - end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index a80d862f7..3c4864ab7 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -129,7 +129,7 @@ t_query_clients_with_time(_) -> NowTimeStampInt = erlang:system_time(millisecond), %% Do not uri_encode `=` to `%3D` Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list( - emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))), + emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))), TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)), LteKeys = ["lte_created_at=", "lte_connected_at="], @@ -147,10 +147,10 @@ t_query_clients_with_time(_) -> || {ok, Response} <- RequestResults], {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults), %% EachData :: list() - [?assert( emqx_mgmt_api_clients:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt) + [?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt) || #{<<"data">> := EachData} <- LteResponseDecodeds, #{<<"created_at">> := CreatedAt} <- EachData], - [?assert(emqx_mgmt_api_clients:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt) + [?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt) || #{<<"data">> := EachData} <- LteResponseDecodeds, #{<<"connected_at">> := ConnectedAt} <- EachData], [?assertEqual(EachData, [])