From 60914697b2be6983e9b6b83abb58fc661ac56190 Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 8 Sep 2021 17:59:53 +0800 Subject: [PATCH] refactor(emqx_lwm2m): refactor lwm2m api use new rest framework --- apps/emqx_gateway/src/coap/emqx_coap_api.erl | 8 +- .../emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl | 232 ++++++------- .../src/lwm2m/emqx_lwm2m_channel.erl | 10 +- .../emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl | 4 +- .../src/lwm2m/emqx_lwm2m_session.erl | 56 +++- .../emqx_gateway/test/emqx_coap_api_SUITE.erl | 24 -- .../test/emqx_lwm2m_api_SUITE.erl | 317 ++++++++++++++++++ 7 files changed, 485 insertions(+), 166 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl diff --git a/apps/emqx_gateway/src/coap/emqx_coap_api.erl b/apps/emqx_gateway/src/coap/emqx_coap_api.erl index 428e99ac5..4d0e8aff8 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_api.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_api.erl @@ -64,9 +64,9 @@ request(post, #{body := Body, bindings := Bindings}) -> case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of timeout -> - {504}; + {504, #{code => 'CLIENT_NOT_RESPONSE'}}; not_found -> - {404}; + {404, #{code => 'CLIENT_NOT_FOUND'}}; Response -> {200, format_to_response(CT, Response)} end. @@ -101,8 +101,8 @@ request_method_meta() -> <<"request payload, binary must encode by base64">>), responses => #{ <<"200">> => object_schema(coap_message_properties()), - <<"404">> => schema(<<"NotFound">>), - <<"504">> => schema(<<"Timeout">>) + <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']), + <<"504">> => error_schema("timeout", ['CLIENT_NOT_RESPONSE']) }}. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl index 03c3a6bc2..98c9fabe8 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_api.erl @@ -16,143 +16,119 @@ -module(emqx_lwm2m_api). --rest_api(#{name => list, - method => 'GET', - path => "/lwm2m_channels/", - func => list, - descr => "A list of all lwm2m channel" - }). +-behaviour(minirest_api). --rest_api(#{name => list, - method => 'GET', - path => "/nodes/:atom:node/lwm2m_channels/", - func => list, - descr => "A list of lwm2m channel of a node" - }). +-export([api_spec/0]). --rest_api(#{name => lookup_cmd, - method => 'GET', - path => "/lookup_cmd/:bin:ep/", - func => lookup_cmd, - descr => "Send a lwm2m downlink command" - }). +-export([lookup_cmd/2]). --rest_api(#{name => lookup_cmd, - method => 'GET', - path => "/nodes/:atom:node/lookup_cmd/:bin:ep/", - func => lookup_cmd, - descr => "Send a lwm2m downlink command of a node" - }). +-define(PREFIX, "/gateway/lwm2m/:clientid"). --export([ list/2 - , lookup_cmd/2 - ]). +-import(emqx_mgmt_util, [ object_schema/1 + , error_schema/2 + , properties/1]). -list(#{node := Node }, Params) -> - case Node = node() of - true -> list(#{}, Params); - _ -> rpc_call(Node, list, [#{}, Params]) - end; +api_spec() -> + {[lookup_cmd_api()], []}. -list(#{}, _Params) -> - %% Channels = emqx_lwm2m_cm:all_channels(), - Channels = [], - return({ok, format(Channels)}). +lookup_cmd_paramters() -> + [ make_paramter(clientid, path, true, "string") + , make_paramter(path, query, true, "string") + , make_paramter(action, query, true, "string")]. -lookup_cmd(#{ep := Ep, node := Node}, Params) -> - case Node = node() of - true -> lookup_cmd(#{ep => Ep}, Params); - _ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params]) - end; +lookup_cmd_properties() -> + properties([ {clientid, string} + , {path, string} + , {action, string} + , {code, string} + , {codeMsg, string} + , {content, {array, object}, lookup_cmd_content_props()}]). -lookup_cmd(#{ep := _Ep}, Params) -> - _MsgType = proplists:get_value(<<"msgType">>, Params), - _Path0 = proplists:get_value(<<"path">>, Params), - %% case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of - %% [] -> return({ok, []}); - %% [{_, undefined} | _] -> return({ok, []}); - %% [{{IMEI, Path, MsgType}, undefined}] -> - %% return({ok, [{imei, IMEI}, - %% {'msgType', IMEI}, - %% {'code', <<"6.01">>}, - %% {'codeMsg', <<"reply_not_received">>}, - %% {'path', Path}]}); - %% [{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] -> - %% Payload1 = format_cmd_content(Content, MsgType), - %% return({ok, [{imei, IMEI}, - %% {'msgType', IMEI}, - %% {'code', Code}, - %% {'codeMsg', CodeMsg}, - %% {'path', Path}] ++ Payload1}) - %% end. - return({ok, []}). +lookup_cmd_content_props() -> + [ {operations, string, <<"Resource Operations">>} + , {dataType, string, <<"Resource Type">>} + , {path, string, <<"Resource Path">>} + , {name, string, <<"Resource Name">>}]. -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res +lookup_cmd_api() -> + Metadata = #{get => + #{description => <<"look up resource">>, + parameters => lookup_cmd_paramters(), + responses => + #{<<"200">> => object_schema(lookup_cmd_properties()), + <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']) + } + }}, + {?PREFIX ++ "/lookup_cmd", Metadata, lookup_cmd}. + + +lookup_cmd(get, #{bindings := Bindings, query_string := QS}) -> + ClientId = maps:get(clientid, Bindings), + case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of + [Channel | _] -> + #{<<"path">> := Path, + <<"action">> := Action} = QS, + {ok, Result} = emqx_lwm2m_channel:lookup_cmd(Channel, Path, Action), + lookup_cmd_return(Result, ClientId, Action, Path); + _ -> + {404, #{code => 'CLIENT_NOT_FOUND'}} end. -format(Channels) -> - lists:map(fun({IMEI, #{lifetime := LifeTime, - peername := Peername, - version := Version, - reg_info := RegInfo}}) -> - ObjectList = lists:map(fun(Path) -> - [ObjId | _] = path_list(Path), - case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of - {error, _} -> - {Path, Path}; - ObjDefinition -> - ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition), - {Path, list_to_binary(ObjectName)} - end - end, maps:get(<<"objectList">>, RegInfo)), - {IpAddr, Port} = Peername, - [{imei, IMEI}, - {lifetime, LifeTime}, - {ip_address, iolist_to_binary(ntoa(IpAddr))}, - {port, Port}, - {version, Version}, - {'objectList', ObjectList}] - end, Channels). +lookup_cmd_return(undefined, ClientId, Action, Path) -> + {200, + #{clientid => ClientId, + action => Action, + code => <<"6.01">>, + codeMsg => <<"reply_not_received">>, + path => Path}}; -%% format_cmd_content(undefined, _MsgType) -> []; -%% format_cmd_content(_Content, <<"discover">>) -> -%% %% [H | Content1] = Content, -%% %% {_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H), -%% %% [ObjId | _]= path_list(HObjId), -%% %% ObjectList = case Content1 of -%% %% [Content2 | _] -> -%% %% {_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2), -%% %% ObjL; -%% %% [] -> [] -%% %% end, -%% %% R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of -%% %% {error, _} -> -%% %% lists:map(fun(Object) -> {Object, Object} end, ObjectList); -%% %% ObjDefinition -> -%% %% lists:map(fun(Object) -> -%% %% [_, _, ResId| _] = path_list(Object), -%% %% Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of -%% %% "E" -> [{operations, list_to_binary("E")}]; -%% %% Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))}, -%% %% {operations, list_to_binary(Oper)}] -%% %% end, -%% %% [{path, Object}, -%% %% {name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))} -%% %% ] ++ Operations -%% %% end, ObjectList) -%% %% end, -%% %% [{content, R}]; -%% []; -%% format_cmd_content(Content, _) -> -%% [{content, Content}]. +lookup_cmd_return({Code, CodeMsg, Content}, ClientId, Action, Path) -> + {200, + format_cmd_content(Content, + Action, + #{clientid => ClientId, + action => Action, + code => Code, + codeMsg => CodeMsg, + path => Path})}. -ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> - inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); -ntoa(IP) -> - inet_parse:ntoa(IP). +format_cmd_content(undefined, _MsgType, Result) -> + Result; + +format_cmd_content(Content, <<"discover">>, Result) -> + [H | Content1] = Content, + {_, [HObjId]} = emqx_lwm2m_session:parse_object_list(H), + [ObjId | _]= path_list(HObjId), + ObjectList = case Content1 of + [Content2 | _] -> + {_, ObjL} = emqx_lwm2m_session:parse_object_list(Content2), + ObjL; + [] -> [] + end, + + R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of + {error, _} -> + lists:map(fun(Object) -> #{Object => Object} end, ObjectList); + ObjDefinition -> + lists:map( + fun(Object) -> + [_, _, RawResId| _] = path_list(Object), + ResId = binary_to_integer(RawResId), + Operations = case emqx_lwm2m_xml_object:get_resource_operations(ResId, ObjDefinition) of + "E" -> + #{operations => list_to_binary("E")}; + Oper -> + #{'dataType' => list_to_binary(emqx_lwm2m_xml_object:get_resource_type(ResId, ObjDefinition)), + operations => list_to_binary(Oper)} + end, + Operations#{path => Object, + name => list_to_binary(emqx_lwm2m_xml_object:get_resource_name(ResId, ObjDefinition))} + end, ObjectList) + end, + Result#{content => R}; + +format_cmd_content(Content, _, Result) -> + Result#{content => Content}. path_list(Path) -> case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of @@ -162,6 +138,8 @@ path_list(Path) -> [ObjId] -> [ObjId] end. -return(_) -> -%% TODO: V5 API - ok. +make_paramter(Name, In, IsRequired, Type) -> + #{name => Name, + in => In, + required => IsRequired, + schema => #{type => Type}}. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index d0647897b..6ad78742f 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -25,7 +25,8 @@ , info/2 , stats/1 , with_context/2 - , do_takeover/3]). + , do_takeover/3 + , lookup_cmd/3]). -export([ init/2 , handle_in/2 @@ -116,6 +117,9 @@ with_context(Ctx, ClientInfo) -> with_context(Type, Topic, Ctx, ClientInfo) end. +lookup_cmd(Channel, Path, Action) -> + gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}). + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -150,6 +154,10 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- +handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) -> + Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), + {reply, {ok, Result}, Channel}; + handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, Channel}. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl index 7c0cc95cd..e17a83195 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd.erl @@ -341,8 +341,8 @@ extract_path(Ref = #{}) -> drop_query( case Ref of #{<<"data">> := Data} -> - case maps:get(<<"path">>, Data, nil) of - nil -> maps:get(<<"basePath">>, Data, undefined); + case maps:get(<<"path">>, Data, undefined) of + undefined -> maps:get(<<"basePath">>, Data, undefined); Path -> Path end; #{<<"path">> := Path} -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index a1d03e04f..ab27dfbca 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -22,7 +22,8 @@ -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). %% API --export([new/0, init/4, update/3, reregister/3, on_close/1]). +-export([ new/0, init/4, update/3, parse_object_list/1 + , reregister/3, on_close/1, find_cmd_record/3]). -export([ info/1 , info/2 @@ -42,6 +43,15 @@ -type timestamp() :: non_neg_integer(). -type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}. +-type cmd_path() :: binary(). +-type cmd_type() :: binary(). +-type cmd_record_key() :: {cmd_path(), cmd_type()}. +-type cmd_code() :: binary(). +-type cmd_code_msg() :: binary(). +-type cmd_code_content() :: list(map()). +-type cmd_result() :: undefined | {cmd_code(), cmd_code_msg(), cmd_code_content()}. +-type cmd_record() :: #{cmd_record_key() => cmd_result()}. + -record(session, { coap :: emqx_coap_tm:manager() , queue :: queue:queue(queued_request()) , wait_ack :: request_context() | undefined @@ -52,6 +62,7 @@ , is_cache_mode :: boolean() , mountpoint :: binary() , last_active_at :: non_neg_integer() + , cmd_record :: cmd_record() }). -type session() :: #session{}. @@ -61,6 +72,8 @@ -define(IGNORE_OBJECT, [<<"0">>, <<"1">>, <<"2">>, <<"4">>, <<"5">>, <<"6">>, <<"7">>, <<"9">>, <<"15">>]). +-define(CMD_KEY(Path, Type), {Path, Type}). + %% uplink and downlink topic configuration -define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}). @@ -98,6 +111,7 @@ new() -> , last_active_at = ?NOW , is_cache_mode = false , mountpoint = <<>> + , cmd_record = #{} , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. -spec init(emqx_coap_message(), binary(), function(), session()) -> map(). @@ -135,6 +149,10 @@ on_close(Session) -> emqx:unsubscribe(MountedTopic), MountedTopic. +-spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result(). +find_cmd_record(Path, Type, #session{cmd_record = Record}) -> + maps:get(?CMD_KEY(Path, Type), Record, undefined). + %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- @@ -271,7 +289,7 @@ parse_object_list(FullObjLinkList) -> (<>) when Prefix =:= AlterPath -> trim(Link); (Link) -> Link - end, ObjLinkList), + end, ObjLinkList), {AlterPath, WithOutPrefix} end. @@ -443,19 +461,20 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType}, Session) -> MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx), {ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)), - Session2 = + Session2 = record_response(EventType, MqttPayload, Session), + Session3 = case {ReqPath, MqttPayload, EventType, CoapMsgType} of {[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> %% this is a notification for status update during NB firmware upgrade. %% need to reply to DM http callbacks - send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session); + send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session2); {_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> %% this is actually a notification, correct the msgType - send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session); + send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session2); _ -> - send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session) + send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session2) end, - send_dl_msg(Ctx, Session2). + send_dl_msg(Ctx, Session3). %%-------------------------------------------------------------------- %% Ack @@ -624,7 +643,8 @@ deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session) WithContext(metrics, 'messages.delivered'), {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), ExpiryTime = get_expiry_time(MQTT), - maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session). + Session2 = record_request(Ctx, Session), + maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session2). maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, #session{wait_ack = WaitAck, @@ -692,3 +712,23 @@ do_out([{Ctx, Out} | T], TM, Msgs) -> do_out(_, TM, Msgs) -> {ok, TM, Msgs}. + + +%%-------------------------------------------------------------------- +%% CMD Record +%%-------------------------------------------------------------------- +-spec record_request(request_context(), session()) -> session(). +record_request(#{<<"msgType">> := Type} = Context, Session) -> + Path = emqx_lwm2m_cmd:extract_path(Context), + record_cmd(Path, Type, undefined, Session). + +record_response(EventType, #{<<"data">> := Data}, Session) -> + ReqPath = maps:get(<<"reqPath">>, Data, undefined), + Code = maps:get(<<"code">>, Data, undefined), + CodeMsg = maps:get(<<"codeMsg">>, Data, undefined), + Content = maps:get(<<"content">>, Data, undefined), + record_cmd(ReqPath, EventType, {Code, CodeMsg, Content}, Session). + +record_cmd(Path, Type, Result, #session{cmd_record = Record} = Session) -> + Record2 = Record#{?CMD_KEY(Path, Type) => Result}, + Session#session{cmd_record = Record2}. diff --git a/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl index 74b0cadc8..83521f5cd 100644 --- a/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_api_SUITE.erl @@ -62,12 +62,6 @@ end_per_suite(Config) -> emqx_mgmt_api_test_util:end_suite([emqx_gateway]), Config. -set_special_configs(emqx_gatewway) -> - ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT); - -set_special_configs(_) -> - ok. - %%-------------------------------------------------------------------- %% Cases %%-------------------------------------------------------------------- @@ -187,17 +181,6 @@ split_segments(Path, Char, Acc) -> make_segment(Seg) -> list_to_binary(emqx_http_lib:uri_decode(Seg)). - -get_coap_path(Options) -> - get_path(Options, <<>>). - -get_coap_query(Options) -> - proplists:get_value(uri_query, Options, []). - -get_coap_observe(Options) -> - get_observe(Options). - - get_path([], Acc) -> %?LOGT("get_path Acc=~p", [Acc]), Acc; @@ -207,13 +190,6 @@ get_path([{uri_path, Path1}|T], Acc) -> get_path([{_, _}|T], Acc) -> get_path(T, Acc). -get_observe([]) -> - undefined; -get_observe([{observe, V}|_T]) -> - V; -get_observe([{_, _}|T]) -> - get_observe(T). - join_path([], Acc) -> Acc; join_path([<<"/">>|T], Acc) -> join_path(T, Acc); diff --git a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl new file mode 100644 index 000000000..cb2ccf3f8 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl @@ -0,0 +1,317 @@ +%%-------------------------------------------------------------------- +%% Copyright (C) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_lwm2m_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(PORT, 5783). + +-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). + +-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). +-include_lib("lwm2m_coap/include/coap.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(CONF_DEFAULT, <<" +gateway.lwm2m { + xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" + lifetime_min = 1s + lifetime_max = 86400s + qmode_time_windonw = 22 + auto_observe = false + mountpoint = \"lwm2m/%u\" + update_msg_publish_condition = contains_object_list + translators { + command = {topic = \"/dn/#\", qos = 0} + response = {topic = \"/up/resp\", qos = 0} + notify = {topic = \"/up/notify\", qos = 0} + register = {topic = \"/up/resp\", qos = 0} + update = {topic = \"/up/resp\", qos = 0} + } + listeners.udp.default { + bind = 5783 + } +} +">>). + +-define(assertExists(Map, Key), + ?assertNotEqual(maps:get(Key, Map, undefined), undefined)). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), + Config. + +end_per_suite(Config) -> + timer:sleep(300), + emqx_mgmt_api_test_util:end_suite([emqx_gateway]), + Config. + +init_per_testcase(_AllTestCase, Config) -> + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + {ok, _} = application:ensure_all_started(emqx_gateway), + {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]), + + {ok, C} = emqtt:start_link([{host, "localhost"},{port, 1883},{clientid, <<"c1">>}]), + {ok, _} = emqtt:connect(C), + timer:sleep(100), + + [{sock, ClientUdpSock}, {emqx_c, C} | Config]. + +end_per_testcase(_AllTestCase, Config) -> + timer:sleep(300), + gen_udp:close(?config(sock, Config)), + emqtt:disconnect(?config(emqx_c, Config)), + ok = application:stop(emqx_gateway). + +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- +t_lookup_cmd_read(Config) -> + UdpSock = ?config(sock, Config), + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), + timer:sleep(200), + %% step 1, device register ... + test_send_coap_request( UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~s<=345&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, + payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>}, + [], + MsgId1), + #coap_message{method = Method1} = test_recv_coap_response(UdpSock), + ?assertEqual({ok,created}, Method1), + test_recv_mqtt_response(RespTopic), + + %% step2, send a READ command to device + CmdId = 206, + CommandTopic = <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/dm">>, + Command = #{ + <<"requestID">> => CmdId, <<"cacheID">> => CmdId, + <<"msgType">> => <<"read">>, + <<"data">> => #{ + <<"path">> => <<"/3/0/0">> + } + }, + CommandJson = emqx_json:encode(Command), + ?LOGT("CommandJson=~p", [CommandJson]), + test_mqtt_broker:publish(CommandTopic, CommandJson, 0), + timer:sleep(50), + + no_received_request(Epn, <<"/3/0/0">>, <<"read">>), + + Request2 = test_recv_coap_request(UdpSock), + ?LOGT("LwM2M client got ~p", [Request2]), + timer:sleep(50), + + test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true), + timer:sleep(100), + + normal_received_request(Epn, <<"/3/0/0">>, <<"read">>). + +t_lookup_cmd_discover(Config) -> + %% step 1, device register ... + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + UdpSock = ?config(sock, Config), + ObjectList = <<", , , , ">>, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), + timer:sleep(200), + + std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic), + + %% step2, send a WRITE command to device + CommandTopic = <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/dm">>, + CmdId = 307, + Command = #{<<"requestID">> => CmdId, <<"cacheID">> => CmdId, + <<"msgType">> => <<"discover">>, + <<"data">> => #{ + <<"path">> => <<"/3/0/7">> + } }, + CommandJson = emqx_json:encode(Command), + test_mqtt_broker:publish(CommandTopic, CommandJson, 0), + + no_received_request(Epn, <<"/3/0/7">>, <<"discover">>), + + timer:sleep(50), + Request2 = test_recv_coap_request(UdpSock), + timer:sleep(50), + + PayloadDiscover = <<";dim=8;pmin=10;pmax=60;gt=50;lt=42.2,">>, + test_send_coap_response(UdpSock, + "127.0.0.1", + ?PORT, + {ok, content}, + #coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover}, + Request2, + true), + timer:sleep(100), + discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal Functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +send_request(ClientId, Path, Action) -> + ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, "lookup_cmd"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Query = io_lib:format("path=~s&action=~s", [Path, Action]), + {ok, Response} = emqx_mgmt_api_test_util:request_api(get, ApiPath, Query, Auth), + ?LOGT("rest api response:~s~n", [Response]), + Response. + +no_received_request(ClientId, Path, Action) -> + Response = send_request(ClientId, Path, Action), + NotReceived = #{<<"clientid">> => list_to_binary(ClientId), + <<"action">> => Action, + <<"code">> => <<"6.01">>, + <<"codeMsg">> => <<"reply_not_received">>, + <<"path">> => Path}, + ?assertEqual(NotReceived, emqx_json:decode(Response, [return_maps])). +normal_received_request(ClientId, Path, Action) -> + Response = send_request(ClientId, Path, Action), + RCont = emqx_json:decode(Response, [return_maps]), + ?assertEqual(list_to_binary(ClientId), maps:get(<<"clientid">>, RCont, undefined)), + ?assertEqual(Path, maps:get(<<"path">>, RCont, undefined)), + ?assertEqual(Action, maps:get(<<"action">>, RCont, undefined)), + ?assertExists(RCont, <<"code">>), + ?assertExists(RCont, <<"codeMsg">>), + ?assertExists(RCont, <<"content">>), + RCont. + +discover_received_request(ClientId, Path, Action) -> + RCont = normal_received_request(ClientId, Path, Action), + [Res | _] = maps:get(<<"content">>, RCont), + ?assertExists(Res, <<"path">>), + ?assertExists(Res, <<"name">>), + ?assertExists(Res, <<"operations">>). + +test_recv_mqtt_response(RespTopic) -> + receive + {publish, #{topic := RespTopic, payload := RM}} -> + ?LOGT("test_recv_mqtt_response Response=~p", [RM]), + RM + after 1000 -> timeout_test_recv_mqtt_response + end. + +test_send_coap_request(UdpSock, Method, Uri, Content, Options, MsgId) -> + is_record(Content, coap_content) orelse error("Content must be a #coap_content!"), + is_list(Options) orelse error("Options must be a list"), + case resolve_uri(Uri) of + {coap, {IpAddr, Port}, Path, Query} -> + Request0 = lwm2m_coap_message:request(con, Method, Content, [{uri_path, Path}, {uri_query, Query} | Options]), + Request = Request0#coap_message{id = MsgId}, + ?LOGT("send_coap_request Request=~p", [Request]), + RequestBinary = lwm2m_coap_message_parser:encode(Request), + ?LOGT("test udp socket send to ~p:~p, data=~p", [IpAddr, Port, RequestBinary]), + ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary); + {SchemeDiff, ChIdDiff, _, _} -> + error(lists:flatten(io_lib:format("scheme ~s or ChId ~s does not match with socket", [SchemeDiff, ChIdDiff]))) + end. + +test_recv_coap_response(UdpSock) -> + {ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000), + Response = lwm2m_coap_message_parser:decode(Packet), + ?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p", [Address, Port, Packet, Response]), + #coap_message{type = ack, method = Method, id=Id, token = Token, options = Options, payload = Payload} = Response, + ?LOGT("receive coap response Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), + Response. + +test_recv_coap_request(UdpSock) -> + case gen_udp:recv(UdpSock, 0, 2000) of + {ok, {_Address, _Port, Packet}} -> + Request = lwm2m_coap_message_parser:decode(Packet), + #coap_message{type = con, method = Method, id=Id, token = Token, payload = Payload, options = Options} = Request, + ?LOGT("receive coap request Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), + Request; + {error, Reason} -> + ?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]), + timeout_test_recv_coap_request + end. + +test_send_coap_response(UdpSock, Host, Port, Code, Content, Request, Ack) -> + is_record(Content, coap_content) orelse error("Content must be a #coap_content!"), + is_list(Host) orelse error("Host is not a string"), + + {ok, IpAddr} = inet:getaddr(Host, inet), + Response = lwm2m_coap_message:response(Code, Content, Request), + Response2 = case Ack of + true -> Response#coap_message{type = ack}; + false -> Response + end, + ?LOGT("test_send_coap_response Response=~p", [Response2]), + ok = gen_udp:send(UdpSock, IpAddr, Port, lwm2m_coap_message_parser:encode(Response2)). + +std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic) -> + test_send_coap_request( UdpSock, + post, + sprintf("coap://127.0.0.1:~b/rd?ep=~s<=345&lwm2m=1", [?PORT, Epn]), + #coap_content{content_format = <<"text/plain">>, payload = ObjectList}, + [], + MsgId1), + #coap_message{method = {ok,created}} = test_recv_coap_response(UdpSock), + test_recv_mqtt_response(RespTopic), + timer:sleep(100). + +resolve_uri(Uri) -> + {ok, #{scheme := Scheme, + host := Host, + port := PortNo, + path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri), + Query = maps:get(query, URIMap, ""), + {ok, PeerIP} = inet:getaddr(Host, inet), + {Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}. + +split_path([]) -> []; +split_path([$/]) -> []; +split_path([$/ | Path]) -> split_segments(Path, $/, []). + +split_query([]) -> []; +split_query(Path) -> split_segments(Path, $&, []). + +split_segments(Path, Char, Acc) -> + case string:rchr(Path, Char) of + 0 -> + [make_segment(Path) | Acc]; + N when N > 0 -> + split_segments(string:substr(Path, 1, N-1), Char, + [make_segment(string:substr(Path, N+1)) | Acc]) + end. + +make_segment(Seg) -> + list_to_binary(emqx_http_lib:uri_decode(Seg)). + +join_path([], Acc) -> Acc; +join_path([<<"/">>|T], Acc) -> + join_path(T, Acc); +join_path([H|T], Acc) -> + join_path(T, <>). + +sprintf(Format, Args) -> + lists:flatten(io_lib:format(Format, Args)).