From e8f6035c3401e8de734b5ecfe91364115150a76e Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 10 Nov 2021 14:20:33 +0800 Subject: [PATCH] feat(emqx_lwm2m): add some lwm2m api (#6047) --- .../emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl | 109 +++++++++++++++- .../src/lwm2m/emqx_lwm2m_channel.erl | 11 +- .../src/lwm2m/emqx_lwm2m_session.erl | 15 ++- .../test/emqx_lwm2m_api_SUITE.erl | 117 ++++++++++++++++-- 4 files changed, 236 insertions(+), 16 deletions(-) diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl index 98c9fabe8..caf683455 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl @@ -20,21 +20,28 @@ -export([api_spec/0]). --export([lookup_cmd/2]). +-export([lookup_cmd/2, observe/2, read/2, write/2]). -define(PREFIX, "/gateway/lwm2m/:clientid"). -import(emqx_mgmt_util, [ object_schema/1 , error_schema/2 - , properties/1]). + , properties/1 + , schema/1 + ]). api_spec() -> - {[lookup_cmd_api()], []}. + {[ lookup_cmd_api(), observe_api(), read_api() + , write_api() + ], []}. -lookup_cmd_paramters() -> +base_paramters() -> [ make_paramter(clientid, path, true, "string") , make_paramter(path, query, true, "string") - , make_paramter(action, query, true, "string")]. + ]. + +lookup_cmd_paramters() -> + base_paramters() ++ [make_paramter(action, query, true, "string")]. lookup_cmd_properties() -> properties([ {clientid, string} @@ -62,6 +69,50 @@ lookup_cmd_api() -> {?PREFIX ++ "/lookup_cmd", Metadata, lookup_cmd}. +observe_api() -> + Metadata = #{post => + #{description => <<"(cancel)observe resource">>, + parameters => base_paramters() ++ + [make_paramter(enable, query, true, "boolean")], + responses => + #{<<"200">> => schema(<<"Successed">>), + <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']) + } + }}, + {?PREFIX ++ "/observe", Metadata, observe}. + +read_api() -> + Metadata = #{post => + #{description => <<"read resource">>, + parameters => base_paramters(), + responses => + #{<<"200">> => schema(<<"Successed">>), + <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']) + } + }}, + {?PREFIX ++ "/read", Metadata, read}. + +write_api() -> + Metadata = #{post => + #{description => <<"write to resource">>, + parameters => base_paramters() ++ + [ make_paramter(type, query, true, "string", + [<<"Integer">>, + <<"Float">>, + <<"Time">>, + <<"String">>, + <<"Boolean">>, + <<"Opaque">>, + <<"Objlnk">>]) + , make_paramter(value, query, true, "string") + ], + responses => + #{<<"200">> => schema(<<"Successed">>), + <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']) + } + }}, + {?PREFIX ++ "/write", Metadata, write}. + lookup_cmd(get, #{bindings := Bindings, query_string := QS}) -> ClientId = maps:get(clientid, Bindings), case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of @@ -143,3 +194,51 @@ make_paramter(Name, In, IsRequired, Type) -> in => In, required => IsRequired, schema => #{type => Type}}. + +make_paramter(Name, In, IsRequired, Type, Enum) -> + #{name => Name, + in => In, + required => IsRequired, + schema => #{type => Type, + enum => Enum}}. + +observe(post, #{bindings := #{clientid := ClientId}, + query_string := #{<<"path">> := Path, <<"enable">> := Enable}}) -> + MsgType = case Enable of + true -> <<"observe">>; + _ -> <<"cancel-observe">> + end, + + Cmd = #{<<"msgType">> => MsgType, + <<"data">> => #{<<"path">> => Path} + }, + + send_cmd(ClientId, Cmd). + + +read(post, #{bindings := #{clientid := ClientId}, + query_string := Qs}) -> + + Cmd = #{<<"msgType">> => <<"read">>, + <<"data">> => Qs + }, + + send_cmd(ClientId, Cmd). + +write(post, #{bindings := #{clientid := ClientId}, + query_string := Qs}) -> + + Cmd = #{<<"msgType">> => <<"write">>, + <<"data">> => Qs + }, + + send_cmd(ClientId, Cmd). + +send_cmd(ClientId, Cmd) -> + case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of + [Channel | _] -> + ok = emqx_lwm2m_channel:send_cmd(Channel, Cmd), + {200}; + _ -> + {404, #{code => 'CLIENT_NOT_FOUND'}} + end. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index 6d100b943..be328f32f 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -26,7 +26,9 @@ , stats/1 , with_context/2 , do_takeover/3 - , lookup_cmd/3]). + , lookup_cmd/3 + , send_cmd/2 + ]). -export([ init/2 , handle_in/2 @@ -133,6 +135,9 @@ with_context(Ctx, ClientInfo) -> lookup_cmd(Channel, Path, Action) -> gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}). +send_cmd(Channel, Cmd) -> + gen_server:call(Channel, {?FUNCTION_NAME, Cmd}). + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -171,6 +176,10 @@ handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Chann Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), {reply, {ok, Result}, Channel}; +handle_call({send_cmd, Cmd}, _From, Channel) -> + {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel), + {reply, ok, Outs, Channel2}; + handle_call(Req, _From, Channel) -> ?SLOG(error, #{ msg => "unexpected_call" , call => Req diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index dbb6b7566..f59ec219e 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -34,6 +34,7 @@ , handle_protocol_in/3 , handle_deliver/3 , timeout/3 + , send_cmd/3 , set_reply/2]). -export_type([session/0]). @@ -235,6 +236,9 @@ set_reply(Msg, #session{coap = Coap} = Session) -> Coap2 = emqx_coap_tm:set_reply(Msg, Coap), Session#session{coap = Coap2}. +send_cmd(Cmd, _, Session) -> + return(send_cmd_impl(Cmd, Session)). + %%-------------------------------------------------------------------- %% Protocol Stack %%-------------------------------------------------------------------- @@ -683,6 +687,16 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' : get_expiry_time(_) -> 0. +%%-------------------------------------------------------------------- +%% Send CMD +%%-------------------------------------------------------------------- +send_cmd_impl(Cmd, #session{reg_info = RegInfo} = Session) -> + CacheMode = is_cache_mode(Session), + AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), + {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, Cmd), + Session2 = record_request(Ctx, Session), + maybe_do_deliver_to_coap(Ctx, Req, 0, CacheMode, Session2). + %%-------------------------------------------------------------------- %% Call CoAP %%-------------------------------------------------------------------- @@ -726,7 +740,6 @@ do_out([{Ctx, Out} | T], TM, Msgs) -> do_out(_, TM, Msgs) -> {ok, TM, Msgs}. - %%-------------------------------------------------------------------- %% CMD Record %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl index ed817dbd8..da3a3ed1f 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl @@ -31,9 +31,9 @@ -define(CONF_DEFAULT, <<" gateway.lwm2m { xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" - lifetime_min = 1s + lifetime_min = 100s lifetime_max = 86400s - qmode_time_window = 22 + qmode_time_window = 200 auto_observe = false mountpoint = \"lwm2m/%u\" update_msg_publish_condition = contains_object_list @@ -90,17 +90,17 @@ init_per_testcase(_AllTestCase, Config) -> [{sock, ClientUdpSock}, {emqx_c, C} | Config]. end_per_testcase(_AllTestCase, Config) -> - timer:sleep(300), gen_udp:close(?config(sock, Config)), emqtt:disconnect(?config(emqx_c, Config)), - ok = application:stop(emqx_gateway). + ok = application:stop(emqx_gateway), + timer:sleep(300). %%-------------------------------------------------------------------- %% Cases %%-------------------------------------------------------------------- t_lookup_cmd_read(Config) -> UdpSock = ?config(sock, Config), - Epn = "urn:oma:lwm2m:oma:3", + Epn = "urn:oma:lwm2m:oma:1", MsgId1 = 15, RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), @@ -147,7 +147,7 @@ t_lookup_cmd_read(Config) -> t_lookup_cmd_discover(Config) -> %% step 1, device register ... - Epn = "urn:oma:lwm2m:oma:3", + Epn = "urn:oma:lwm2m:oma:2", MsgId1 = 15, UdpSock = ?config(sock, Config), ObjectList = <<", , , , ">>, @@ -186,10 +186,102 @@ t_lookup_cmd_discover(Config) -> timer:sleep(200), discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>). +t_read(Config) -> + UdpSock = ?config(sock, Config), + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), + timer:sleep(200), + %% step 1, device register ... + test_send_coap_request( UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, + [], + MsgId1), + #coap_message{method = Method1} = test_recv_coap_response(UdpSock), + ?assertEqual({ok,created}, Method1), + + timer:sleep(100), + test_recv_mqtt_response(RespTopic), + + %% step2, call Read API + call_send_api(Epn, "read", "path=/3/0/0"), + timer:sleep(100), + #coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock), + ?assertEqual(con, Type), + ?assertEqual(get, Method), + ?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"0">>], maps:get(uri_path, Opts)). + + +t_write(Config) -> + UdpSock = ?config(sock, Config), + Epn = "urn:oma:lwm2m:oma:4", + MsgId1 = 15, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), + timer:sleep(200), + %% step 1, device register ... + test_send_coap_request( UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, + [], + MsgId1), + #coap_message{method = Method1} = test_recv_coap_response(UdpSock), + ?assertEqual({ok,created}, Method1), + + timer:sleep(100), + test_recv_mqtt_response(RespTopic), + + %% step2, call write API + call_send_api(Epn, "write", "path=/3/0/13&type=Integer&value=123"), + timer:sleep(100), + #coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock), + ?assertEqual(con, Type), + ?assertEqual(put, Method), + ?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"13">>], maps:get(uri_path, Opts)), + ?assertEqual(<<"application/vnd.oma.lwm2m+tlv">>, maps:get(content_format, Opts)). + + + +t_observe(Config) -> + UdpSock = ?config(sock, Config), + Epn = "urn:oma:lwm2m:oma:5", + MsgId1 = 15, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), + timer:sleep(200), + %% step 1, device register ... + test_send_coap_request( UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~ts<=600&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, + [], + MsgId1), + #coap_message{method = Method1} = test_recv_coap_response(UdpSock), + ?assertEqual({ok,created}, Method1), + + timer:sleep(100), + test_recv_mqtt_response(RespTopic), + + %% step2, call observe API + call_send_api(Epn, "observe", "path=/3/0/1&enable=false"), + timer:sleep(100), + #coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock), + ?assertEqual(con, Type), + ?assertEqual(get, Method), + ?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"1">>], maps:get(uri_path, Opts)), + ?assertEqual(1, maps:get(observe, Opts)). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Internal Functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -send_request(ClientId, Path, Action) -> +call_lookup_api(ClientId, Path, Action) -> ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, "lookup_cmd"]), Auth = emqx_mgmt_api_test_util:auth_header_(), Query = io_lib:format("path=~ts&action=~ts", [Path, Action]), @@ -197,8 +289,15 @@ send_request(ClientId, Path, Action) -> ?LOGT("rest api response:~ts~n", [Response]), Response. +call_send_api(ClientId, Cmd, Query) -> + ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, Cmd]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, ApiPath, Query, Auth), + ?LOGT("rest api response:~ts~n", [Response]), + Response. + no_received_request(ClientId, Path, Action) -> - Response = send_request(ClientId, Path, Action), + Response = call_lookup_api(ClientId, Path, Action), NotReceived = #{<<"clientid">> => list_to_binary(ClientId), <<"action">> => Action, <<"code">> => <<"6.01">>, @@ -206,7 +305,7 @@ no_received_request(ClientId, Path, Action) -> <<"path">> => Path}, ?assertEqual(NotReceived, emqx_json:decode(Response, [return_maps])). normal_received_request(ClientId, Path, Action) -> - Response = send_request(ClientId, Path, Action), + Response = call_lookup_api(ClientId, Path, Action), RCont = emqx_json:decode(Response, [return_maps]), ?assertEqual(list_to_binary(ClientId), maps:get(<<"clientid">>, RCont, undefined)), ?assertEqual(Path, maps:get(<<"path">>, RCont, undefined)),