Merge pull request #5699 from lafirest/feat/lwm2m_api

refactor(emqx_lwm2m): refactor lwm2m api use new rest framework
This commit is contained in:
lafirest 2021-09-10 10:34:18 +08:00 committed by GitHub
commit 5693981b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 485 additions and 166 deletions

View File

@ -64,9 +64,9 @@ request(post, #{body := Body, bindings := Bindings}) ->
case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of case call_client(ClientId, Msg2, timer:seconds(WaitTime)) of
timeout -> timeout ->
{504}; {504, #{code => 'CLIENT_NOT_RESPONSE'}};
not_found -> not_found ->
{404}; {404, #{code => 'CLIENT_NOT_FOUND'}};
Response -> Response ->
{200, format_to_response(CT, Response)} {200, format_to_response(CT, Response)}
end. end.
@ -101,8 +101,8 @@ request_method_meta() ->
<<"request payload, binary must encode by base64">>), <<"request payload, binary must encode by base64">>),
responses => #{ responses => #{
<<"200">> => object_schema(coap_message_properties()), <<"200">> => object_schema(coap_message_properties()),
<<"404">> => schema(<<"NotFound">>), <<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND']),
<<"504">> => schema(<<"Timeout">>) <<"504">> => error_schema("timeout", ['CLIENT_NOT_RESPONSE'])
}}. }}.

View File

@ -16,143 +16,119 @@
-module(emqx_lwm2m_api). -module(emqx_lwm2m_api).
-rest_api(#{name => list, -behaviour(minirest_api).
method => 'GET',
path => "/lwm2m_channels/",
func => list,
descr => "A list of all lwm2m channel"
}).
-rest_api(#{name => list, -export([api_spec/0]).
method => 'GET',
path => "/nodes/:atom:node/lwm2m_channels/",
func => list,
descr => "A list of lwm2m channel of a node"
}).
-rest_api(#{name => lookup_cmd, -export([lookup_cmd/2]).
method => 'GET',
path => "/lookup_cmd/:bin:ep/",
func => lookup_cmd,
descr => "Send a lwm2m downlink command"
}).
-rest_api(#{name => lookup_cmd, -define(PREFIX, "/gateway/lwm2m/:clientid").
method => 'GET',
path => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
func => lookup_cmd,
descr => "Send a lwm2m downlink command of a node"
}).
-export([ list/2 -import(emqx_mgmt_util, [ object_schema/1
, lookup_cmd/2 , error_schema/2
]). , properties/1]).
list(#{node := Node }, Params) -> api_spec() ->
case Node = node() of {[lookup_cmd_api()], []}.
true -> list(#{}, Params);
_ -> rpc_call(Node, list, [#{}, Params])
end;
list(#{}, _Params) -> lookup_cmd_paramters() ->
%% Channels = emqx_lwm2m_cm:all_channels(), [ make_paramter(clientid, path, true, "string")
Channels = [], , make_paramter(path, query, true, "string")
return({ok, format(Channels)}). , make_paramter(action, query, true, "string")].
lookup_cmd(#{ep := Ep, node := Node}, Params) -> lookup_cmd_properties() ->
case Node = node() of properties([ {clientid, string}
true -> lookup_cmd(#{ep => Ep}, Params); , {path, string}
_ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params]) , {action, string}
end; , {code, string}
, {codeMsg, string}
, {content, {array, object}, lookup_cmd_content_props()}]).
lookup_cmd(#{ep := _Ep}, Params) -> lookup_cmd_content_props() ->
_MsgType = proplists:get_value(<<"msgType">>, Params), [ {operations, string, <<"Resource Operations">>}
_Path0 = proplists:get_value(<<"path">>, Params), , {dataType, string, <<"Resource Type">>}
%% case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of , {path, string, <<"Resource Path">>}
%% [] -> return({ok, []}); , {name, string, <<"Resource Name">>}].
%% [{_, undefined} | _] -> return({ok, []});
%% [{{IMEI, Path, MsgType}, undefined}] ->
%% return({ok, [{imei, IMEI},
%% {'msgType', IMEI},
%% {'code', <<"6.01">>},
%% {'codeMsg', <<"reply_not_received">>},
%% {'path', Path}]});
%% [{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
%% Payload1 = format_cmd_content(Content, MsgType),
%% return({ok, [{imei, IMEI},
%% {'msgType', IMEI},
%% {'code', Code},
%% {'codeMsg', CodeMsg},
%% {'path', Path}] ++ Payload1})
%% end.
return({ok, []}).
rpc_call(Node, Fun, Args) -> lookup_cmd_api() ->
case rpc:call(Node, ?MODULE, Fun, Args) of Metadata = #{get =>
{badrpc, Reason} -> {error, Reason}; #{description => <<"look up resource">>,
Res -> Res parameters => lookup_cmd_paramters(),
responses =>
#{<<"200">> => object_schema(lookup_cmd_properties()),
<<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND'])
}
}},
{?PREFIX ++ "/lookup_cmd", Metadata, lookup_cmd}.
lookup_cmd(get, #{bindings := Bindings, query_string := QS}) ->
ClientId = maps:get(clientid, Bindings),
case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of
[Channel | _] ->
#{<<"path">> := Path,
<<"action">> := Action} = QS,
{ok, Result} = emqx_lwm2m_channel:lookup_cmd(Channel, Path, Action),
lookup_cmd_return(Result, ClientId, Action, Path);
_ ->
{404, #{code => 'CLIENT_NOT_FOUND'}}
end. end.
format(Channels) -> lookup_cmd_return(undefined, ClientId, Action, Path) ->
lists:map(fun({IMEI, #{lifetime := LifeTime, {200,
peername := Peername, #{clientid => ClientId,
version := Version, action => Action,
reg_info := RegInfo}}) -> code => <<"6.01">>,
ObjectList = lists:map(fun(Path) -> codeMsg => <<"reply_not_received">>,
[ObjId | _] = path_list(Path), path => Path}};
case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
{error, _} ->
{Path, Path};
ObjDefinition ->
ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
{Path, list_to_binary(ObjectName)}
end
end, maps:get(<<"objectList">>, RegInfo)),
{IpAddr, Port} = Peername,
[{imei, IMEI},
{lifetime, LifeTime},
{ip_address, iolist_to_binary(ntoa(IpAddr))},
{port, Port},
{version, Version},
{'objectList', ObjectList}]
end, Channels).
%% format_cmd_content(undefined, _MsgType) -> []; lookup_cmd_return({Code, CodeMsg, Content}, ClientId, Action, Path) ->
%% format_cmd_content(_Content, <<"discover">>) -> {200,
%% %% [H | Content1] = Content, format_cmd_content(Content,
%% %% {_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H), Action,
%% %% [ObjId | _]= path_list(HObjId), #{clientid => ClientId,
%% %% ObjectList = case Content1 of action => Action,
%% %% [Content2 | _] -> code => Code,
%% %% {_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2), codeMsg => CodeMsg,
%% %% ObjL; path => Path})}.
%% %% [] -> []
%% %% end,
%% %% R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
%% %% {error, _} ->
%% %% lists:map(fun(Object) -> {Object, Object} end, ObjectList);
%% %% ObjDefinition ->
%% %% lists:map(fun(Object) ->
%% %% [_, _, ResId| _] = path_list(Object),
%% %% Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
%% %% "E" -> [{operations, list_to_binary("E")}];
%% %% Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
%% %% {operations, list_to_binary(Oper)}]
%% %% end,
%% %% [{path, Object},
%% %% {name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
%% %% ] ++ Operations
%% %% end, ObjectList)
%% %% end,
%% %% [{content, R}];
%% [];
%% format_cmd_content(Content, _) ->
%% [{content, Content}].
ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> format_cmd_content(undefined, _MsgType, Result) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); Result;
ntoa(IP) ->
inet_parse:ntoa(IP). format_cmd_content(Content, <<"discover">>, Result) ->
[H | Content1] = Content,
{_, [HObjId]} = emqx_lwm2m_session:parse_object_list(H),
[ObjId | _]= path_list(HObjId),
ObjectList = case Content1 of
[Content2 | _] ->
{_, ObjL} = emqx_lwm2m_session:parse_object_list(Content2),
ObjL;
[] -> []
end,
R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
{error, _} ->
lists:map(fun(Object) -> #{Object => Object} end, ObjectList);
ObjDefinition ->
lists:map(
fun(Object) ->
[_, _, RawResId| _] = path_list(Object),
ResId = binary_to_integer(RawResId),
Operations = case emqx_lwm2m_xml_object:get_resource_operations(ResId, ObjDefinition) of
"E" ->
#{operations => list_to_binary("E")};
Oper ->
#{'dataType' => list_to_binary(emqx_lwm2m_xml_object:get_resource_type(ResId, ObjDefinition)),
operations => list_to_binary(Oper)}
end,
Operations#{path => Object,
name => list_to_binary(emqx_lwm2m_xml_object:get_resource_name(ResId, ObjDefinition))}
end, ObjectList)
end,
Result#{content => R};
format_cmd_content(Content, _, Result) ->
Result#{content => Content}.
path_list(Path) -> path_list(Path) ->
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
@ -162,6 +138,8 @@ path_list(Path) ->
[ObjId] -> [ObjId] [ObjId] -> [ObjId]
end. end.
return(_) -> make_paramter(Name, In, IsRequired, Type) ->
%% TODO: V5 API #{name => Name,
ok. in => In,
required => IsRequired,
schema => #{type => Type}}.

View File

@ -25,7 +25,8 @@
, info/2 , info/2
, stats/1 , stats/1
, with_context/2 , with_context/2
, do_takeover/3]). , do_takeover/3
, lookup_cmd/3]).
-export([ init/2 -export([ init/2
, handle_in/2 , handle_in/2
@ -116,6 +117,9 @@ with_context(Ctx, ClientInfo) ->
with_context(Type, Topic, Ctx, ClientInfo) with_context(Type, Topic, Ctx, ClientInfo)
end. end.
lookup_cmd(Channel, Path, Action) ->
gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -150,6 +154,10 @@ handle_timeout(_, _, Channel) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle call %% Handle call
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) ->
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
{reply, {ok, Result}, Channel};
handle_call(Req, _From, Channel) -> handle_call(Req, _From, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, Channel}. {reply, ignored, Channel}.

View File

@ -341,8 +341,8 @@ extract_path(Ref = #{}) ->
drop_query( drop_query(
case Ref of case Ref of
#{<<"data">> := Data} -> #{<<"data">> := Data} ->
case maps:get(<<"path">>, Data, nil) of case maps:get(<<"path">>, Data, undefined) of
nil -> maps:get(<<"basePath">>, Data, undefined); undefined -> maps:get(<<"basePath">>, Data, undefined);
Path -> Path Path -> Path
end; end;
#{<<"path">> := Path} -> #{<<"path">> := Path} ->

View File

@ -22,7 +22,8 @@
-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl"). -include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
%% API %% API
-export([new/0, init/4, update/3, reregister/3, on_close/1]). -export([ new/0, init/4, update/3, parse_object_list/1
, reregister/3, on_close/1, find_cmd_record/3]).
-export([ info/1 -export([ info/1
, info/2 , info/2
@ -42,6 +43,15 @@
-type timestamp() :: non_neg_integer(). -type timestamp() :: non_neg_integer().
-type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}. -type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}.
-type cmd_path() :: binary().
-type cmd_type() :: binary().
-type cmd_record_key() :: {cmd_path(), cmd_type()}.
-type cmd_code() :: binary().
-type cmd_code_msg() :: binary().
-type cmd_code_content() :: list(map()).
-type cmd_result() :: undefined | {cmd_code(), cmd_code_msg(), cmd_code_content()}.
-type cmd_record() :: #{cmd_record_key() => cmd_result()}.
-record(session, { coap :: emqx_coap_tm:manager() -record(session, { coap :: emqx_coap_tm:manager()
, queue :: queue:queue(queued_request()) , queue :: queue:queue(queued_request())
, wait_ack :: request_context() | undefined , wait_ack :: request_context() | undefined
@ -52,6 +62,7 @@
, is_cache_mode :: boolean() , is_cache_mode :: boolean()
, mountpoint :: binary() , mountpoint :: binary()
, last_active_at :: non_neg_integer() , last_active_at :: non_neg_integer()
, cmd_record :: cmd_record()
}). }).
-type session() :: #session{}. -type session() :: #session{}.
@ -61,6 +72,8 @@
-define(IGNORE_OBJECT, [<<"0">>, <<"1">>, <<"2">>, <<"4">>, <<"5">>, <<"6">>, -define(IGNORE_OBJECT, [<<"0">>, <<"1">>, <<"2">>, <<"4">>, <<"5">>, <<"6">>,
<<"7">>, <<"9">>, <<"15">>]). <<"7">>, <<"9">>, <<"15">>]).
-define(CMD_KEY(Path, Type), {Path, Type}).
%% uplink and downlink topic configuration %% uplink and downlink topic configuration
-define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}). -define(lwm2m_up_dm_topic, {<<"/v1/up/dm">>, 0}).
@ -98,6 +111,7 @@ new() ->
, last_active_at = ?NOW , last_active_at = ?NOW
, is_cache_mode = false , is_cache_mode = false
, mountpoint = <<>> , mountpoint = <<>>
, cmd_record = #{}
, lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}.
-spec init(emqx_coap_message(), binary(), function(), session()) -> map(). -spec init(emqx_coap_message(), binary(), function(), session()) -> map().
@ -135,6 +149,10 @@ on_close(Session) ->
emqx:unsubscribe(MountedTopic), emqx:unsubscribe(MountedTopic),
MountedTopic. MountedTopic.
-spec find_cmd_record(cmd_path(), cmd_type(), session()) -> cmd_result().
find_cmd_record(Path, Type, #session{cmd_record = Record}) ->
maps:get(?CMD_KEY(Path, Type), Record, undefined).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Stats %% Info, Stats
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -271,7 +289,7 @@ parse_object_list(FullObjLinkList) ->
(<<Prefix:LenAlterPath/binary, Link/binary>>) when Prefix =:= AlterPath -> (<<Prefix:LenAlterPath/binary, Link/binary>>) when Prefix =:= AlterPath ->
trim(Link); trim(Link);
(Link) -> Link (Link) -> Link
end, ObjLinkList), end, ObjLinkList),
{AlterPath, WithOutPrefix} {AlterPath, WithOutPrefix}
end. end.
@ -443,19 +461,20 @@ handle_coap_response({Ctx = #{<<"msgType">> := EventType},
Session) -> Session) ->
MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx), MqttPayload = emqx_lwm2m_cmd:coap_to_mqtt(CoapMsgMethod, CoapMsgPayload, CoapMsgOpts, Ctx),
{ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)), {ReqPath, _} = emqx_lwm2m_cmd:path_list(emqx_lwm2m_cmd:extract_path(Ctx)),
Session2 = Session2 = record_response(EventType, MqttPayload, Session),
Session3 =
case {ReqPath, MqttPayload, EventType, CoapMsgType} of case {ReqPath, MqttPayload, EventType, CoapMsgType} of
{[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> {[<<"5">>| _], _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
%% this is a notification for status update during NB firmware upgrade. %% this is a notification for status update during NB firmware upgrade.
%% need to reply to DM http callbacks %% need to reply to DM http callbacks
send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session); send_to_mqtt(Ctx, <<"notify">>, MqttPayload, ?lwm2m_up_dm_topic, WithContext, Session2);
{_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack -> {_ReqPath, _, <<"observe">>, CoapMsgType} when CoapMsgType =/= ack ->
%% this is actually a notification, correct the msgType %% this is actually a notification, correct the msgType
send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session); send_to_mqtt(Ctx, <<"notify">>, MqttPayload, WithContext, Session2);
_ -> _ ->
send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session) send_to_mqtt(Ctx, EventType, MqttPayload, WithContext, Session2)
end, end,
send_dl_msg(Ctx, Session2). send_dl_msg(Ctx, Session3).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ack %% Ack
@ -624,7 +643,8 @@ deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
WithContext(metrics, 'messages.delivered'), WithContext(metrics, 'messages.delivered'),
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData), {Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
ExpiryTime = get_expiry_time(MQTT), ExpiryTime = get_expiry_time(MQTT),
maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session). Session2 = record_request(Ctx, Session),
maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, Session2).
maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode, maybe_do_deliver_to_coap(Ctx, Req, ExpiryTime, CacheMode,
#session{wait_ack = WaitAck, #session{wait_ack = WaitAck,
@ -692,3 +712,23 @@ do_out([{Ctx, Out} | T], TM, Msgs) ->
do_out(_, TM, Msgs) -> do_out(_, TM, Msgs) ->
{ok, TM, Msgs}. {ok, TM, Msgs}.
%%--------------------------------------------------------------------
%% CMD Record
%%--------------------------------------------------------------------
-spec record_request(request_context(), session()) -> session().
record_request(#{<<"msgType">> := Type} = Context, Session) ->
Path = emqx_lwm2m_cmd:extract_path(Context),
record_cmd(Path, Type, undefined, Session).
record_response(EventType, #{<<"data">> := Data}, Session) ->
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
Code = maps:get(<<"code">>, Data, undefined),
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
Content = maps:get(<<"content">>, Data, undefined),
record_cmd(ReqPath, EventType, {Code, CodeMsg, Content}, Session).
record_cmd(Path, Type, Result, #session{cmd_record = Record} = Session) ->
Record2 = Record#{?CMD_KEY(Path, Type) => Result},
Session#session{cmd_record = Record2}.

View File

@ -62,12 +62,6 @@ end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_gateway]), emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
Config. Config.
set_special_configs(emqx_gatewway) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
set_special_configs(_) ->
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cases %% Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -187,17 +181,6 @@ split_segments(Path, Char, Acc) ->
make_segment(Seg) -> make_segment(Seg) ->
list_to_binary(emqx_http_lib:uri_decode(Seg)). list_to_binary(emqx_http_lib:uri_decode(Seg)).
get_coap_path(Options) ->
get_path(Options, <<>>).
get_coap_query(Options) ->
proplists:get_value(uri_query, Options, []).
get_coap_observe(Options) ->
get_observe(Options).
get_path([], Acc) -> get_path([], Acc) ->
%?LOGT("get_path Acc=~p", [Acc]), %?LOGT("get_path Acc=~p", [Acc]),
Acc; Acc;
@ -207,13 +190,6 @@ get_path([{uri_path, Path1}|T], Acc) ->
get_path([{_, _}|T], Acc) -> get_path([{_, _}|T], Acc) ->
get_path(T, Acc). get_path(T, Acc).
get_observe([]) ->
undefined;
get_observe([{observe, V}|_T]) ->
V;
get_observe([{_, _}|T]) ->
get_observe(T).
join_path([], Acc) -> Acc; join_path([], Acc) -> Acc;
join_path([<<"/">>|T], Acc) -> join_path([<<"/">>|T], Acc) ->
join_path(T, Acc); join_path(T, Acc);

View File

@ -0,0 +1,317 @@
%%--------------------------------------------------------------------
%% Copyright (C) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lwm2m_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-define(PORT, 5783).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-include_lib("emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl").
-include_lib("lwm2m_coap/include/coap.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"
gateway.lwm2m {
xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"
lifetime_min = 1s
lifetime_max = 86400s
qmode_time_windonw = 22
auto_observe = false
mountpoint = \"lwm2m/%u\"
update_msg_publish_condition = contains_object_list
translators {
command = {topic = \"/dn/#\", qos = 0}
response = {topic = \"/up/resp\", qos = 0}
notify = {topic = \"/up/notify\", qos = 0}
register = {topic = \"/up/resp\", qos = 0}
update = {topic = \"/up/resp\", qos = 0}
}
listeners.udp.default {
bind = 5783
}
}
">>).
-define(assertExists(Map, Key),
?assertNotEqual(maps:get(Key, Map, undefined), undefined)).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
Config.
end_per_suite(Config) ->
timer:sleep(300),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
Config.
init_per_testcase(_AllTestCase, Config) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
{ok, _} = application:ensure_all_started(emqx_gateway),
{ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
{ok, C} = emqtt:start_link([{host, "localhost"},{port, 1883},{clientid, <<"c1">>}]),
{ok, _} = emqtt:connect(C),
timer:sleep(100),
[{sock, ClientUdpSock}, {emqx_c, C} | Config].
end_per_testcase(_AllTestCase, Config) ->
timer:sleep(300),
gen_udp:close(?config(sock, Config)),
emqtt:disconnect(?config(emqx_c, Config)),
ok = application:stop(emqx_gateway).
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_lookup_cmd_read(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~s&lt=345&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
test_recv_mqtt_response(RespTopic),
%% step2, send a READ command to device
CmdId = 206,
CommandTopic = <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/dm">>,
Command = #{
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
<<"msgType">> => <<"read">>,
<<"data">> => #{
<<"path">> => <<"/3/0/0">>
}
},
CommandJson = emqx_json:encode(Command),
?LOGT("CommandJson=~p", [CommandJson]),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
timer:sleep(50),
no_received_request(Epn, <<"/3/0/0">>, <<"read">>),
Request2 = test_recv_coap_request(UdpSock),
?LOGT("LwM2M client got ~p", [Request2]),
timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true),
timer:sleep(100),
normal_received_request(Epn, <<"/3/0/0">>, <<"read">>).
t_lookup_cmd_discover(Config) ->
%% step 1, device register ...
Epn = "urn:oma:lwm2m:oma:3",
MsgId1 = 15,
UdpSock = ?config(sock, Config),
ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
%% step2, send a WRITE command to device
CommandTopic = <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/dm">>,
CmdId = 307,
Command = #{<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
<<"msgType">> => <<"discover">>,
<<"data">> => #{
<<"path">> => <<"/3/0/7">>
} },
CommandJson = emqx_json:encode(Command),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
no_received_request(Epn, <<"/3/0/7">>, <<"discover">>),
timer:sleep(50),
Request2 = test_recv_coap_request(UdpSock),
timer:sleep(50),
PayloadDiscover = <<"</3/0/7>;dim=8;pmin=10;pmax=60;gt=50;lt=42.2,</3/0/8>">>,
test_send_coap_response(UdpSock,
"127.0.0.1",
?PORT,
{ok, content},
#coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover},
Request2,
true),
timer:sleep(100),
discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Internal Functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
send_request(ClientId, Path, Action) ->
ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, "lookup_cmd"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Query = io_lib:format("path=~s&action=~s", [Path, Action]),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, ApiPath, Query, Auth),
?LOGT("rest api response:~s~n", [Response]),
Response.
no_received_request(ClientId, Path, Action) ->
Response = send_request(ClientId, Path, Action),
NotReceived = #{<<"clientid">> => list_to_binary(ClientId),
<<"action">> => Action,
<<"code">> => <<"6.01">>,
<<"codeMsg">> => <<"reply_not_received">>,
<<"path">> => Path},
?assertEqual(NotReceived, emqx_json:decode(Response, [return_maps])).
normal_received_request(ClientId, Path, Action) ->
Response = send_request(ClientId, Path, Action),
RCont = emqx_json:decode(Response, [return_maps]),
?assertEqual(list_to_binary(ClientId), maps:get(<<"clientid">>, RCont, undefined)),
?assertEqual(Path, maps:get(<<"path">>, RCont, undefined)),
?assertEqual(Action, maps:get(<<"action">>, RCont, undefined)),
?assertExists(RCont, <<"code">>),
?assertExists(RCont, <<"codeMsg">>),
?assertExists(RCont, <<"content">>),
RCont.
discover_received_request(ClientId, Path, Action) ->
RCont = normal_received_request(ClientId, Path, Action),
[Res | _] = maps:get(<<"content">>, RCont),
?assertExists(Res, <<"path">>),
?assertExists(Res, <<"name">>),
?assertExists(Res, <<"operations">>).
test_recv_mqtt_response(RespTopic) ->
receive
{publish, #{topic := RespTopic, payload := RM}} ->
?LOGT("test_recv_mqtt_response Response=~p", [RM]),
RM
after 1000 -> timeout_test_recv_mqtt_response
end.
test_send_coap_request(UdpSock, Method, Uri, Content, Options, MsgId) ->
is_record(Content, coap_content) orelse error("Content must be a #coap_content!"),
is_list(Options) orelse error("Options must be a list"),
case resolve_uri(Uri) of
{coap, {IpAddr, Port}, Path, Query} ->
Request0 = lwm2m_coap_message:request(con, Method, Content, [{uri_path, Path}, {uri_query, Query} | Options]),
Request = Request0#coap_message{id = MsgId},
?LOGT("send_coap_request Request=~p", [Request]),
RequestBinary = lwm2m_coap_message_parser:encode(Request),
?LOGT("test udp socket send to ~p:~p, data=~p", [IpAddr, Port, RequestBinary]),
ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary);
{SchemeDiff, ChIdDiff, _, _} ->
error(lists:flatten(io_lib:format("scheme ~s or ChId ~s does not match with socket", [SchemeDiff, ChIdDiff])))
end.
test_recv_coap_response(UdpSock) ->
{ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000),
Response = lwm2m_coap_message_parser:decode(Packet),
?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p", [Address, Port, Packet, Response]),
#coap_message{type = ack, method = Method, id=Id, token = Token, options = Options, payload = Payload} = Response,
?LOGT("receive coap response Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]),
Response.
test_recv_coap_request(UdpSock) ->
case gen_udp:recv(UdpSock, 0, 2000) of
{ok, {_Address, _Port, Packet}} ->
Request = lwm2m_coap_message_parser:decode(Packet),
#coap_message{type = con, method = Method, id=Id, token = Token, payload = Payload, options = Options} = Request,
?LOGT("receive coap request Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]),
Request;
{error, Reason} ->
?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]),
timeout_test_recv_coap_request
end.
test_send_coap_response(UdpSock, Host, Port, Code, Content, Request, Ack) ->
is_record(Content, coap_content) orelse error("Content must be a #coap_content!"),
is_list(Host) orelse error("Host is not a string"),
{ok, IpAddr} = inet:getaddr(Host, inet),
Response = lwm2m_coap_message:response(Code, Content, Request),
Response2 = case Ack of
true -> Response#coap_message{type = ack};
false -> Response
end,
?LOGT("test_send_coap_response Response=~p", [Response2]),
ok = gen_udp:send(UdpSock, IpAddr, Port, lwm2m_coap_message_parser:encode(Response2)).
std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic) ->
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~s&lt=345&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>, payload = ObjectList},
[],
MsgId1),
#coap_message{method = {ok,created}} = test_recv_coap_response(UdpSock),
test_recv_mqtt_response(RespTopic),
timer:sleep(100).
resolve_uri(Uri) ->
{ok, #{scheme := Scheme,
host := Host,
port := PortNo,
path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri),
Query = maps:get(query, URIMap, ""),
{ok, PeerIP} = inet:getaddr(Host, inet),
{Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}.
split_path([]) -> [];
split_path([$/]) -> [];
split_path([$/ | Path]) -> split_segments(Path, $/, []).
split_query([]) -> [];
split_query(Path) -> split_segments(Path, $&, []).
split_segments(Path, Char, Acc) ->
case string:rchr(Path, Char) of
0 ->
[make_segment(Path) | Acc];
N when N > 0 ->
split_segments(string:substr(Path, 1, N-1), Char,
[make_segment(string:substr(Path, N+1)) | Acc])
end.
make_segment(Seg) ->
list_to_binary(emqx_http_lib:uri_decode(Seg)).
join_path([], Acc) -> Acc;
join_path([<<"/">>|T], Acc) ->
join_path(T, Acc);
join_path([H|T], Acc) ->
join_path(T, <<Acc/binary, $/, H/binary>>).
sprintf(Format, Args) ->
lists:flatten(io_lib:format(Format, Args)).