feat(lwm2m): add emqx_lwm2m http API
This commit is contained in:
parent
8e08e83090
commit
c7ebc12ce1
|
@ -1,23 +1,13 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_lwm2m}
|
||||
]}
|
||||
],
|
||||
[
|
||||
{"4.3.1", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.0", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
{<<"4.3.[0-1]">>, [
|
||||
{restart_application, emqx_lwm2m}
|
||||
]}
|
||||
]
|
||||
}.
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_api).
|
||||
|
||||
-import(minirest, [return/1]).
|
||||
|
||||
-rest_api(#{name => list,
|
||||
method => 'GET',
|
||||
path => "/lwm2m_channels/",
|
||||
func => list,
|
||||
descr => "A list of all lwm2m channel"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => list,
|
||||
method => 'GET',
|
||||
path => "/nodes/:atom:node/lwm2m_channels/",
|
||||
func => list,
|
||||
descr => "A list of lwm2m channel of a node"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => lookup_cmd,
|
||||
method => 'GET',
|
||||
path => "/lookup_cmd/:bin:ep/",
|
||||
func => lookup_cmd,
|
||||
descr => "Send a lwm2m downlink command"
|
||||
}).
|
||||
|
||||
-rest_api(#{name => lookup_cmd,
|
||||
method => 'GET',
|
||||
path => "/nodes/:atom:node/lookup_cmd/:bin:ep/",
|
||||
func => lookup_cmd,
|
||||
descr => "Send a lwm2m downlink command of a node"
|
||||
}).
|
||||
|
||||
-export([ list/2
|
||||
, lookup_cmd/2
|
||||
]).
|
||||
|
||||
list(#{node := Node }, Params) ->
|
||||
case Node = node() of
|
||||
true -> list(#{}, Params);
|
||||
_ -> rpc_call(Node, list, [#{}, Params])
|
||||
end;
|
||||
|
||||
list(#{}, _Params) ->
|
||||
Channels = emqx_lwm2m_cm:all_channels(),
|
||||
return({ok, format(Channels)}).
|
||||
|
||||
lookup_cmd(#{ep := Ep, node := Node}, Params) ->
|
||||
case Node = node() of
|
||||
true -> lookup_cmd(#{ep => Ep}, Params);
|
||||
_ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
|
||||
end;
|
||||
|
||||
lookup_cmd(#{ep := Ep}, Params) ->
|
||||
MsgType = proplists:get_value(<<"msgType">>, Params),
|
||||
Path0 = proplists:get_value(<<"path">>, Params),
|
||||
case emqx_lwm2m_cm:lookup_cmd(Ep, Path0, MsgType) of
|
||||
[] -> return({ok, []});
|
||||
[{_, undefined} | _] -> return({ok, []});
|
||||
[{{IMEI, Path, MsgType}, undefined}] ->
|
||||
return({ok, [{imei, IMEI},
|
||||
{'msgType', IMEI},
|
||||
{'code', <<"6.01">>},
|
||||
{'codeMsg', <<"reply_not_received">>},
|
||||
{'path', Path}]});
|
||||
[{{IMEI, Path, MsgType}, {Code, CodeMsg, Content}}] ->
|
||||
Payload1 = format_cmd_content(Content, MsgType),
|
||||
return({ok, [{imei, IMEI},
|
||||
{'msgType', IMEI},
|
||||
{'code', Code},
|
||||
{'codeMsg', CodeMsg},
|
||||
{'path', Path}] ++ Payload1})
|
||||
end.
|
||||
|
||||
rpc_call(Node, Fun, Args) ->
|
||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||
{badrpc, Reason} -> {error, Reason};
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
format(Channels) ->
|
||||
lists:map(fun({IMEI, #{lifetime := LifeTime,
|
||||
peername := Peername,
|
||||
version := Version,
|
||||
reg_info := RegInfo}}) ->
|
||||
ObjectList = lists:map(fun(Path) ->
|
||||
[ObjId | _] = path_list(Path),
|
||||
case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||
{error, _} ->
|
||||
{Path, Path};
|
||||
ObjDefinition ->
|
||||
ObjectName = emqx_lwm2m_xml_object:get_object_name(ObjDefinition),
|
||||
{Path, list_to_binary(ObjectName)}
|
||||
end
|
||||
end, maps:get(<<"objectList">>, RegInfo)),
|
||||
{IpAddr, Port} = Peername,
|
||||
[{imei, IMEI},
|
||||
{lifetime, LifeTime},
|
||||
{ip_address, iolist_to_binary(ntoa(IpAddr))},
|
||||
{port, Port},
|
||||
{version, Version},
|
||||
{'objectList', ObjectList}]
|
||||
end, Channels).
|
||||
|
||||
format_cmd_content(undefined, _MsgType) -> [];
|
||||
format_cmd_content(Content, <<"discover">>) ->
|
||||
[H | Content1] = Content,
|
||||
{_, [HObjId]} = emqx_lwm2m_coap_resource:parse_object_list(H),
|
||||
[ObjId | _]= path_list(HObjId),
|
||||
ObjectList = case Content1 of
|
||||
[Content2 | _] ->
|
||||
{_, ObjL} = emqx_lwm2m_coap_resource:parse_object_list(Content2),
|
||||
ObjL;
|
||||
[] -> []
|
||||
end,
|
||||
R = case emqx_lwm2m_xml_object:get_obj_def(binary_to_integer(ObjId), true) of
|
||||
{error, _} ->
|
||||
lists:map(fun(Object) -> {Object, Object} end, ObjectList);
|
||||
ObjDefinition ->
|
||||
lists:map(fun(Object) ->
|
||||
[_, _, ResId| _] = path_list(Object),
|
||||
Operations = case emqx_lwm2m_xml_object:get_resource_operations(binary_to_integer(ResId), ObjDefinition) of
|
||||
"E" -> [{operations, list_to_binary("E")}];
|
||||
Oper -> [{'dataType', list_to_binary(emqx_lwm2m_xml_object:get_resource_type(binary_to_integer(ResId), ObjDefinition))},
|
||||
{operations, list_to_binary(Oper)}]
|
||||
end,
|
||||
[{path, Object},
|
||||
{name, list_to_binary(emqx_lwm2m_xml_object:get_resource_name(binary_to_integer(ResId), ObjDefinition))}
|
||||
] ++ Operations
|
||||
end, ObjectList)
|
||||
end,
|
||||
[{content, R}];
|
||||
format_cmd_content(Content, _) ->
|
||||
[{content, Content}].
|
||||
|
||||
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
|
||||
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
|
||||
ntoa(IP) ->
|
||||
inet_parse:ntoa(IP).
|
||||
|
||||
path_list(Path) ->
|
||||
case binary:split(binary_util:trim(Path, $/), [<<$/>>], [global]) of
|
||||
[ObjId, ObjInsId, ResId, ResInstId] -> [ObjId, ObjInsId, ResId, ResInstId];
|
||||
[ObjId, ObjInsId, ResId] -> [ObjId, ObjInsId, ResId];
|
||||
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
|
||||
[ObjId] -> [ObjId]
|
||||
end.
|
|
@ -0,0 +1,153 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_cm).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([ register_channel/5
|
||||
, update_reg_info/2
|
||||
, unregister_channel/1
|
||||
]).
|
||||
|
||||
-export([ lookup_channel/1
|
||||
, all_channels/0
|
||||
]).
|
||||
|
||||
-export([ register_cmd/3
|
||||
, register_cmd/4
|
||||
, lookup_cmd/3
|
||||
, lookup_cmd_by_imei/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
|
||||
|
||||
%% Server name
|
||||
-define(CM, ?MODULE).
|
||||
|
||||
-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
|
||||
-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
|
||||
|
||||
%% Batch drain
|
||||
-define(BATCH_SIZE, 100000).
|
||||
|
||||
%% @doc Start the channel manager.
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
|
||||
Info = #{
|
||||
reg_info => RegInfo,
|
||||
lifetime => LifeTime,
|
||||
version => Ver,
|
||||
peername => Peername
|
||||
},
|
||||
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
|
||||
cast({registered, {IMEI, self()}}).
|
||||
|
||||
update_reg_info(IMEI, RegInfo) ->
|
||||
case lookup_channel(IMEI) of
|
||||
[{_, RegInfo0}] ->
|
||||
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
|
||||
ok;
|
||||
[] ->
|
||||
ok
|
||||
end.
|
||||
|
||||
unregister_channel(IMEI) when is_binary(IMEI) ->
|
||||
true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
|
||||
ok.
|
||||
|
||||
lookup_channel(IMEI) ->
|
||||
ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
|
||||
|
||||
all_channels() ->
|
||||
ets:tab2list(?LWM2M_CHANNEL_TAB).
|
||||
|
||||
register_cmd(IMEI, Path, Type) ->
|
||||
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
|
||||
|
||||
register_cmd(_IMEI, undefined, _Type, _Result) ->
|
||||
ok;
|
||||
register_cmd(IMEI, Path, Type, Result) ->
|
||||
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
|
||||
|
||||
lookup_cmd(IMEI, Path, Type) ->
|
||||
ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
|
||||
|
||||
lookup_cmd_by_imei(IMEI) ->
|
||||
ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
|
||||
|
||||
%% @private
|
||||
cast(Msg) -> gen_server:cast(?CM, Msg).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
|
||||
ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
|
||||
ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
|
||||
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
||||
PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
|
||||
{noreply, State#{chan_pmon := PMon1}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
||||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
||||
{noreply, State#{chan_pmon := PMon1}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
emqx_stats:cancel_update(chan_stats).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
clean_down({_ChanPid, IMEI}) ->
|
||||
unregister_channel(IMEI).
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lwm2m_cm_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
CM = #{id => emqx_lwm2m_cm,
|
||||
start => {emqx_lwm2m_cm, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [emqx_lwm2m_cm]},
|
||||
SupFlags = #{strategy => one_for_one,
|
||||
intensity => 100,
|
||||
period => 10
|
||||
},
|
||||
{ok, {SupFlags, [CM]}}.
|
||||
|
|
@ -29,4 +29,11 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init(_Args) ->
|
||||
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db)] }}.
|
||||
CmSup = #{id => emqx_lwm2m_cm_sup,
|
||||
start => {emqx_lwm2m_cm_sup, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => infinity,
|
||||
type => supervisor,
|
||||
modules => [emqx_lwm2m_cm_sup]
|
||||
},
|
||||
{ok, { {one_for_all, 10, 3600}, [?CHILD(emqx_lwm2m_xml_object_db), CmSup] }}.
|
||||
|
|
|
@ -21,9 +21,11 @@
|
|||
|
||||
-export([ get_obj_def/2
|
||||
, get_object_id/1
|
||||
, get_object_name/1
|
||||
, get_object_and_resource_id/2
|
||||
, get_resource_type/2
|
||||
, get_resource_name/2
|
||||
, get_resource_operations/2
|
||||
]).
|
||||
|
||||
-define(LOG(Level, Format, Args),
|
||||
|
@ -42,6 +44,10 @@ get_object_id(ObjDefinition) ->
|
|||
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
|
||||
ObjectId.
|
||||
|
||||
get_object_name(ObjDefinition) ->
|
||||
[#xmlText{value=ObjectName}] = xmerl_xpath:string("Name/text()", ObjDefinition),
|
||||
ObjectName.
|
||||
|
||||
|
||||
get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
|
||||
ResourceNameString = binary_to_list(ResourceNameBinary),
|
||||
|
@ -60,3 +66,8 @@ get_resource_name(ResourceIdInt, ObjDefinition) ->
|
|||
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||
[#xmlText{value=Name}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Name/text()", ObjDefinition),
|
||||
Name.
|
||||
|
||||
get_resource_operations(ResourceIdInt, ObjDefinition) ->
|
||||
ResourceIdString = integer_to_list(ResourceIdInt),
|
||||
[#xmlText{value=Operations}] = xmerl_xpath:string("Resources/Item[@ID=\""++ResourceIdString++"\"]/Operations/text()", ObjDefinition),
|
||||
Operations.
|
||||
|
|
|
@ -58,7 +58,7 @@ find_objectid(ObjectId) ->
|
|||
false -> ObjectId
|
||||
end,
|
||||
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectIdInt) of
|
||||
[] -> error(no_xml_definition);
|
||||
[] -> {error, no_xml_definition};
|
||||
[{ObjectId, Xml}] -> Xml
|
||||
end.
|
||||
|
||||
|
@ -121,8 +121,10 @@ load(BaseDir) ->
|
|||
true -> BaseDir++"*.xml";
|
||||
false -> BaseDir++"/*.xml"
|
||||
end,
|
||||
AllXmlFiles = filelib:wildcard(Wild),
|
||||
load_loop(AllXmlFiles).
|
||||
case filelib:wildcard(Wild) of
|
||||
[] -> error(no_xml_files_found, BaseDir);
|
||||
AllXmlFiles -> load_loop(AllXmlFiles)
|
||||
end.
|
||||
|
||||
load_loop([]) ->
|
||||
ok;
|
||||
|
|
Loading…
Reference in New Issue