feat(emqx_lwm2m): add some lwm2m api (#6047)

This commit is contained in:
lafirest 2021-11-10 14:20:33 +08:00 committed by GitHub
parent 2d159ad9a1
commit e8f6035c34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 236 additions and 16 deletions

View File

@ -20,21 +20,28 @@
-export([api_spec/0]).
-export([lookup_cmd/2]).
-export([lookup_cmd/2, observe/2, read/2, write/2]).
-define(PREFIX, "/gateway/lwm2m/:clientid").
-import(emqx_mgmt_util, [ object_schema/1
, error_schema/2
, properties/1]).
, properties/1
, schema/1
]).
api_spec() ->
{[lookup_cmd_api()], []}.
{[ lookup_cmd_api(), observe_api(), read_api()
, write_api()
], []}.
lookup_cmd_paramters() ->
base_paramters() ->
[ make_paramter(clientid, path, true, "string")
, make_paramter(path, query, true, "string")
, make_paramter(action, query, true, "string")].
].
lookup_cmd_paramters() ->
base_paramters() ++ [make_paramter(action, query, true, "string")].
lookup_cmd_properties() ->
properties([ {clientid, string}
@ -62,6 +69,50 @@ lookup_cmd_api() ->
{?PREFIX ++ "/lookup_cmd", Metadata, lookup_cmd}.
observe_api() ->
Metadata = #{post =>
#{description => <<"(cancel)observe resource">>,
parameters => base_paramters() ++
[make_paramter(enable, query, true, "boolean")],
responses =>
#{<<"200">> => schema(<<"Successed">>),
<<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND'])
}
}},
{?PREFIX ++ "/observe", Metadata, observe}.
read_api() ->
Metadata = #{post =>
#{description => <<"read resource">>,
parameters => base_paramters(),
responses =>
#{<<"200">> => schema(<<"Successed">>),
<<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND'])
}
}},
{?PREFIX ++ "/read", Metadata, read}.
write_api() ->
Metadata = #{post =>
#{description => <<"write to resource">>,
parameters => base_paramters() ++
[ make_paramter(type, query, true, "string",
[<<"Integer">>,
<<"Float">>,
<<"Time">>,
<<"String">>,
<<"Boolean">>,
<<"Opaque">>,
<<"Objlnk">>])
, make_paramter(value, query, true, "string")
],
responses =>
#{<<"200">> => schema(<<"Successed">>),
<<"404">> => error_schema("client not found error", ['CLIENT_NOT_FOUND'])
}
}},
{?PREFIX ++ "/write", Metadata, write}.
lookup_cmd(get, #{bindings := Bindings, query_string := QS}) ->
ClientId = maps:get(clientid, Bindings),
case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of
@ -143,3 +194,51 @@ make_paramter(Name, In, IsRequired, Type) ->
in => In,
required => IsRequired,
schema => #{type => Type}}.
make_paramter(Name, In, IsRequired, Type, Enum) ->
#{name => Name,
in => In,
required => IsRequired,
schema => #{type => Type,
enum => Enum}}.
observe(post, #{bindings := #{clientid := ClientId},
query_string := #{<<"path">> := Path, <<"enable">> := Enable}}) ->
MsgType = case Enable of
true -> <<"observe">>;
_ -> <<"cancel-observe">>
end,
Cmd = #{<<"msgType">> => MsgType,
<<"data">> => #{<<"path">> => Path}
},
send_cmd(ClientId, Cmd).
read(post, #{bindings := #{clientid := ClientId},
query_string := Qs}) ->
Cmd = #{<<"msgType">> => <<"read">>,
<<"data">> => Qs
},
send_cmd(ClientId, Cmd).
write(post, #{bindings := #{clientid := ClientId},
query_string := Qs}) ->
Cmd = #{<<"msgType">> => <<"write">>,
<<"data">> => Qs
},
send_cmd(ClientId, Cmd).
send_cmd(ClientId, Cmd) ->
case emqx_gateway_cm_registry:lookup_channels(lwm2m, ClientId) of
[Channel | _] ->
ok = emqx_lwm2m_channel:send_cmd(Channel, Cmd),
{200};
_ ->
{404, #{code => 'CLIENT_NOT_FOUND'}}
end.

View File

@ -26,7 +26,9 @@
, stats/1
, with_context/2
, do_takeover/3
, lookup_cmd/3]).
, lookup_cmd/3
, send_cmd/2
]).
-export([ init/2
, handle_in/2
@ -133,6 +135,9 @@ with_context(Ctx, ClientInfo) ->
lookup_cmd(Channel, Path, Action) ->
gen_server:call(Channel, {?FUNCTION_NAME, Path, Action}).
send_cmd(Channel, Cmd) ->
gen_server:call(Channel, {?FUNCTION_NAME, Cmd}).
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
@ -171,6 +176,10 @@ handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Chann
Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session),
{reply, {ok, Result}, Channel};
handle_call({send_cmd, Cmd}, _From, Channel) ->
{ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel),
{reply, ok, Outs, Channel2};
handle_call(Req, _From, Channel) ->
?SLOG(error, #{ msg => "unexpected_call"
, call => Req

View File

@ -34,6 +34,7 @@
, handle_protocol_in/3
, handle_deliver/3
, timeout/3
, send_cmd/3
, set_reply/2]).
-export_type([session/0]).
@ -235,6 +236,9 @@ set_reply(Msg, #session{coap = Coap} = Session) ->
Coap2 = emqx_coap_tm:set_reply(Msg, Coap),
Session#session{coap = Coap2}.
send_cmd(Cmd, _, Session) ->
return(send_cmd_impl(Cmd, Session)).
%%--------------------------------------------------------------------
%% Protocol Stack
%%--------------------------------------------------------------------
@ -683,6 +687,16 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' :
get_expiry_time(_) ->
0.
%%--------------------------------------------------------------------
%% Send CMD
%%--------------------------------------------------------------------
send_cmd_impl(Cmd, #session{reg_info = RegInfo} = Session) ->
CacheMode = is_cache_mode(Session),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, Cmd),
Session2 = record_request(Ctx, Session),
maybe_do_deliver_to_coap(Ctx, Req, 0, CacheMode, Session2).
%%--------------------------------------------------------------------
%% Call CoAP
%%--------------------------------------------------------------------
@ -726,7 +740,6 @@ do_out([{Ctx, Out} | T], TM, Msgs) ->
do_out(_, TM, Msgs) ->
{ok, TM, Msgs}.
%%--------------------------------------------------------------------
%% CMD Record
%%--------------------------------------------------------------------

View File

@ -31,9 +31,9 @@
-define(CONF_DEFAULT, <<"
gateway.lwm2m {
xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"
lifetime_min = 1s
lifetime_min = 100s
lifetime_max = 86400s
qmode_time_window = 22
qmode_time_window = 200
auto_observe = false
mountpoint = \"lwm2m/%u\"
update_msg_publish_condition = contains_object_list
@ -90,17 +90,17 @@ init_per_testcase(_AllTestCase, Config) ->
[{sock, ClientUdpSock}, {emqx_c, C} | Config].
end_per_testcase(_AllTestCase, Config) ->
timer:sleep(300),
gen_udp:close(?config(sock, Config)),
emqtt:disconnect(?config(emqx_c, Config)),
ok = application:stop(emqx_gateway).
ok = application:stop(emqx_gateway),
timer:sleep(300).
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_lookup_cmd_read(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",
Epn = "urn:oma:lwm2m:oma:1",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
@ -147,7 +147,7 @@ t_lookup_cmd_read(Config) ->
t_lookup_cmd_discover(Config) ->
%% step 1, device register ...
Epn = "urn:oma:lwm2m:oma:3",
Epn = "urn:oma:lwm2m:oma:2",
MsgId1 = 15,
UdpSock = ?config(sock, Config),
ObjectList = <<"</1>, </2>, </3/0>, </4>, </5>">>,
@ -186,10 +186,102 @@ t_lookup_cmd_discover(Config) ->
timer:sleep(200),
discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>).
t_read(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
timer:sleep(100),
test_recv_mqtt_response(RespTopic),
%% step2, call Read API
call_send_api(Epn, "read", "path=/3/0/0"),
timer:sleep(100),
#coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock),
?assertEqual(con, Type),
?assertEqual(get, Method),
?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"0">>], maps:get(uri_path, Opts)).
t_write(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:4",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
timer:sleep(100),
test_recv_mqtt_response(RespTopic),
%% step2, call write API
call_send_api(Epn, "write", "path=/3/0/13&type=Integer&value=123"),
timer:sleep(100),
#coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock),
?assertEqual(con, Type),
?assertEqual(put, Method),
?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"13">>], maps:get(uri_path, Opts)),
?assertEqual(<<"application/vnd.oma.lwm2m+tlv">>, maps:get(content_format, Opts)).
t_observe(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:5",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
timer:sleep(100),
test_recv_mqtt_response(RespTopic),
%% step2, call observe API
call_send_api(Epn, "observe", "path=/3/0/1&enable=false"),
timer:sleep(100),
#coap_message{type = Type, method = Method, options = Opts} = test_recv_coap_request(UdpSock),
?assertEqual(con, Type),
?assertEqual(get, Method),
?assertEqual([<<"lwm2m">>, <<"3">>, <<"0">>, <<"1">>], maps:get(uri_path, Opts)),
?assertEqual(1, maps:get(observe, Opts)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Internal Functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
send_request(ClientId, Path, Action) ->
call_lookup_api(ClientId, Path, Action) ->
ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, "lookup_cmd"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Query = io_lib:format("path=~ts&action=~ts", [Path, Action]),
@ -197,8 +289,15 @@ send_request(ClientId, Path, Action) ->
?LOGT("rest api response:~ts~n", [Response]),
Response.
call_send_api(ClientId, Cmd, Query) ->
ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, Cmd]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, ApiPath, Query, Auth),
?LOGT("rest api response:~ts~n", [Response]),
Response.
no_received_request(ClientId, Path, Action) ->
Response = send_request(ClientId, Path, Action),
Response = call_lookup_api(ClientId, Path, Action),
NotReceived = #{<<"clientid">> => list_to_binary(ClientId),
<<"action">> => Action,
<<"code">> => <<"6.01">>,
@ -206,7 +305,7 @@ no_received_request(ClientId, Path, Action) ->
<<"path">> => Path},
?assertEqual(NotReceived, emqx_json:decode(Response, [return_maps])).
normal_received_request(ClientId, Path, Action) ->
Response = send_request(ClientId, Path, Action),
Response = call_lookup_api(ClientId, Path, Action),
RCont = emqx_json:decode(Response, [return_maps]),
?assertEqual(list_to_binary(ClientId), maps:get(<<"clientid">>, RCont, undefined)),
?assertEqual(Path, maps:get(<<"path">>, RCont, undefined)),