From c7ebc12ce11eb60a0f685e6cd6d46f0a047b3134 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 17 Jun 2021 19:29:36 +0800 Subject: [PATCH] feat(lwm2m): add emqx_lwm2m http API --- apps/emqx_lwm2m/src/emqx_lwm2m.appup.src | 22 +-- apps/emqx_lwm2m/src/emqx_lwm2m_api.erl | 162 ++++++++++++++++++ apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl | 153 +++++++++++++++++ apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl | 41 +++++ apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl | 9 +- apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl | 11 ++ .../src/emqx_lwm2m_xml_object_db.erl | 8 +- 7 files changed, 386 insertions(+), 20 deletions(-) create mode 100644 apps/emqx_lwm2m/src/emqx_lwm2m_api.erl create mode 100644 apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl create mode 100644 apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index db46dd42e..b6d49302f 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -1,23 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.1", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}, - {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_lwm2m} + ]} ], [ - {"4.3.1", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}, - {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} + {<<"4.3.[0-1]">>, [ + {restart_application, emqx_lwm2m} + ]} ] }. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_api.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_api.erl new file mode 100644 index 000000000..6018aa7c7 --- /dev/null +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_api.erl @@ -0,0 +1,162 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_lwm2m_api). + +-import(minirest, [return/1]). + +-rest_api(#{name => list, + method => 'GET', + path => "/lwm2m_channels/", + func => list, + descr => "A list of all lwm2m channel" + }). + +-rest_api(#{name => list, + method => 'GET', + path => "/nodes/:atom:node/lwm2m_channels/", + func => list, + descr => "A list of lwm2m channel of a node" + }). + +-rest_api(#{name => lookup_cmd, + method => 'GET', + path => "/lookup_cmd/:bin:ep/", + func => lookup_cmd, + descr => "Send a lwm2m downlink command" + }). + +-rest_api(#{name => lookup_cmd, + method => 'GET', + path => "/nodes/:atom:node/lookup_cmd/:bin:ep/", + func => lookup_cmd, + descr => "Send a lwm2m downlink command of a node" + }). + +-export([ list/2 + , lookup_cmd/2 + ]). + +list(#{node := Node }, Params) -> + case Node = node() of + true -> list(#{}, Params); + _ -> rpc_call(Node, list, [#{}, Params]) + end; + +list(#{}, _Params) -> + Channels = emqx_lwm2m_cm:all_channels(), + return({ok, format(Channels)}). + +lookup_cmd(#{ep := Ep, node := Node}, Params) -> + case Node = node() of + true -> lookup_cmd(#{ep => Ep}, Params); + _ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params]) + end; + +lookup_cmd(#{ep := Ep}, Params) -> + MsgType = proplists:get_value(<<"msgType">>, Params), + Path0 = proplists:get_value(<<"path">>, Params), + case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of + [] -> return({ok, []}); + [{_, undefined} | _] -> return({ok, []}); + [{{IMEI, Path, MsgType}, undefined}] -> + return({ok, [{imei, IMEI}, + {'msgType', IMEI}, + {'code', <<"6.01">>}, + {'codeMsg', <<"reply_not_received">>}, + {'path', Path}]}); + [{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] -> + Payload1 = format_cmd_content(Content, MsgType), + return({ok, [{imei, IMEI}, + {'msgType', IMEI}, + {'code', Code}, + {'codeMsg', CodeMsg}, + {'path', Path}] ++ Payload1}) + end. + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. + +format(Channels) -> + lists:map(fun({IMEI, #{lifetime := LifeTime, + peername := Peername, + version := Version, + reg_info := RegInfo}}) -> + ObjectList = lists:map(fun(Path) -> + [ObjId | _] = path_list(Path), + case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of + {error, _} -> + {Path, Path}; + ObjDefinition -> + ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition), + {Path, list_to_binary(ObjectName)} + end + end, maps:get(<<"objectList">>, RegInfo)), + {IpAddr, Port} = Peername, + [{imei, IMEI}, + {lifetime, LifeTime}, + {ip_address, iolist_to_binary(ntoa(IpAddr))}, + {port, Port}, + {version, Version}, + {'objectList', ObjectList}] + end, Channels). + +format_cmd_content(undefined, _MsgType) -> []; +format_cmd_content(Content, <<"discover">>) -> + [H | Content1] = Content, + {_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H), + [ObjId | _]= path_list(HObjId), + ObjectList = case Content1 of + [Content2 | _] -> + {_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2), + ObjL; + [] -> [] + end, + R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of + {error, _} -> + lists:map(fun(Object) -> {Object, Object} end, ObjectList); + ObjDefinition -> + lists:map(fun(Object) -> + [_, _, ResId| _] = path_list(Object), + Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of + "E" -> [{operations, list_to_binary("E")}]; + Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))}, + {operations, list_to_binary(Oper)}] + end, + [{path, Object}, + {name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))} + ] ++ Operations + end, ObjectList) + end, + [{content, R}]; +format_cmd_content(Content, _) -> + [{content, Content}]. + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +path_list(Path) -> + case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of + [ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId]; + [ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId]; + [ObjId, ObjInsId] -> [ObjId, ObjInsId]; + [ObjId] -> [ObjId] + end. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl new file mode 100644 index 000000000..16e938b84 --- /dev/null +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_cm.erl @@ -0,0 +1,153 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_lwm2m_cm). + +-export([start_link/0]). + +-export([ register_channel/5 + , update_reg_info/2 + , unregister_channel/1 + ]). + +-export([ lookup_channel/1 + , all_channels/0 + ]). + +-export([ register_cmd/3 + , register_cmd/4 + , lookup_cmd/3 + , lookup_cmd_by_imei/1 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)). + +%% Server name +-define(CM, ?MODULE). + +-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel). +-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd). + +%% Batch drain +-define(BATCH_SIZE, 100000). + +%% @doc Start the channel manager. +start_link() -> + gen_server:start_link({local, ?CM}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) -> + Info = #{ + reg_info => RegInfo, + lifetime => LifeTime, + version => Ver, + peername => Peername + }, + true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}), + cast({registered, {IMEI, self()}}). + +update_reg_info(IMEI, RegInfo) -> + case lookup_channel(IMEI) of + [{_, RegInfo0}] -> + true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}), + ok; + [] -> + ok + end. + +unregister_channel(IMEI) when is_binary(IMEI) -> + true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI), + ok. + +lookup_channel(IMEI) -> + ets:lookup(?LWM2M_CHANNEL_TAB, IMEI). + +all_channels() -> + ets:tab2list(?LWM2M_CHANNEL_TAB). + +register_cmd(IMEI, Path, Type) -> + true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}). + +register_cmd(_IMEI, undefined, _Type, _Result) -> + ok; +register_cmd(IMEI, Path, Type, Result) -> + true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}). + +lookup_cmd(IMEI, Path, Type) -> + ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}). + +lookup_cmd_by_imei(IMEI) -> + ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]). + +%% @private +cast(Msg) -> gen_server:cast(?CM, Msg). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}], + ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]), + ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]), + {ok, #{chan_pmon => emqx_pmon:new()}}. + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) -> + PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon), + {noreply, State#{chan_pmon := PMon1}}; + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> + ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], + {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), + ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]), + {noreply, State#{chan_pmon := PMon1}}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + emqx_stats:cancel_update(chan_stats). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +clean_down({_ChanPid, IMEI}) -> + unregister_channel(IMEI). diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl new file mode 100644 index 000000000..b55f4f33c --- /dev/null +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_cm_sup.erl @@ -0,0 +1,41 @@ + +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_lwm2m_cm_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + CM = #{id => emqx_lwm2m_cm, + start => {emqx_lwm2m_cm, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_lwm2m_cm]}, + SupFlags = #{strategy => one_for_one, + intensity => 100, + period => 10 + }, + {ok, {SupFlags, [CM]}}. + diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl index d28e00874..0546e0080 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_sup.erl @@ -29,4 +29,11 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init(_Args) -> - {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}. + CmSup = #{id => emqx_lwm2m_cm_sup, + start => {emqx_lwm2m_cm_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [emqx_lwm2m_cm_sup] + }, + {ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl index 5931ae75e..dd9911407 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object.erl @@ -21,9 +21,11 @@ -export([ get_obj_def/2 , get_object_id/1 + , get_object_name/1 , get_object_and_resource_id/2 , get_resource_type/2 , get_resource_name/2 + , get_resource_operations/2 ]). -define(LOG(Level, Format, Args), @@ -42,6 +44,10 @@ get_object_id(ObjDefinition) -> [#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition), ObjectId. +get_object_name(ObjDefinition) -> + [#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition), + ObjectName. + get_object_and_resource_id(ResourceNameBinary, ObjDefinition) -> ResourceNameString = binary_to_list(ResourceNameBinary), @@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) -> ResourceIdString = integer_to_list(ResourceIdInt), [#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition), Name. + +get_resource_operations(ResourceIdInt, ObjDefinition) -> + ResourceIdString = integer_to_list(ResourceIdInt), + [#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition), + Operations. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl index b974c148c..012d0a649 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_xml_object_db.erl @@ -58,7 +58,7 @@ find_objectid(ObjectId) -> false -> ObjectId end, case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of - [] -> error(no_xml_definition); + [] -> {error, no_xml_definition}; [{ObjectId, Xml}] -> Xml end. @@ -121,8 +121,10 @@ load(BaseDir) -> true -> BaseDir++"*.xml"; false -> BaseDir++"/*.xml" end, - AllXmlFiles = filelib:wildcard(Wild), - load_loop(AllXmlFiles). + case filelib:wildcard(Wild) of + [] -> error(no_xml_files_found, BaseDir); + AllXmlFiles -> load_loop(AllXmlFiles) + end. load_loop([]) -> ok;