154 lines
4.6 KiB
Erlang
154 lines
4.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_lwm2m_cm).
|
|
|
|
-export([start_link/0]).
|
|
|
|
-export([ register_channel/5
|
|
, update_reg_info/2
|
|
, unregister_channel/1
|
|
]).
|
|
|
|
-export([ lookup_channel/1
|
|
, all_channels/0
|
|
]).
|
|
|
|
-export([ register_cmd/3
|
|
, register_cmd/4
|
|
, lookup_cmd/3
|
|
, lookup_cmd_by_imei/1
|
|
]).
|
|
|
|
%% gen_server callbacks
|
|
-export([ init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
, code_change/3
|
|
]).
|
|
|
|
-define(LOG(Level, Format, Args), logger:Level("LWM2M-CM: " ++ Format, Args)).
|
|
|
|
%% Server name
|
|
-define(CM, ?MODULE).
|
|
|
|
-define(LWM2M_CHANNEL_TAB, emqx_lwm2m_channel).
|
|
-define(LWM2M_CMD_TAB, emqx_lwm2m_cmd).
|
|
|
|
%% Batch drain
|
|
-define(BATCH_SIZE, 100000).
|
|
|
|
%% @doc Start the channel manager.
|
|
start_link() ->
|
|
gen_server:start_link({local, ?CM}, ?MODULE, [], []).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
register_channel(IMEI, RegInfo, LifeTime, Ver, Peername) ->
|
|
Info = #{
|
|
reg_info => RegInfo,
|
|
lifetime => LifeTime,
|
|
version => Ver,
|
|
peername => Peername
|
|
},
|
|
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, Info}),
|
|
cast({registered, {IMEI, self()}}).
|
|
|
|
update_reg_info(IMEI, RegInfo) ->
|
|
case lookup_channel(IMEI) of
|
|
[{_, RegInfo0}] ->
|
|
true = ets:insert(?LWM2M_CHANNEL_TAB, {IMEI, RegInfo0#{reg_info => RegInfo}}),
|
|
ok;
|
|
[] ->
|
|
ok
|
|
end.
|
|
|
|
unregister_channel(IMEI) when is_binary(IMEI) ->
|
|
true = ets:delete(?LWM2M_CHANNEL_TAB, IMEI),
|
|
ok.
|
|
|
|
lookup_channel(IMEI) ->
|
|
ets:lookup(?LWM2M_CHANNEL_TAB, IMEI).
|
|
|
|
all_channels() ->
|
|
ets:tab2list(?LWM2M_CHANNEL_TAB).
|
|
|
|
register_cmd(IMEI, Path, Type) ->
|
|
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, undefined}).
|
|
|
|
register_cmd(_IMEI, undefined, _Type, _Result) ->
|
|
ok;
|
|
register_cmd(IMEI, Path, Type, Result) ->
|
|
true = ets:insert(?LWM2M_CMD_TAB, {{IMEI, Path, Type}, Result}).
|
|
|
|
lookup_cmd(IMEI, Path, Type) ->
|
|
ets:lookup(?LWM2M_CMD_TAB, {IMEI, Path, Type}).
|
|
|
|
lookup_cmd_by_imei(IMEI) ->
|
|
ets:select(?LWM2M_CHANNEL_TAB, [{{{IMEI, '_', '_'}, '$1'}, [], ['$_']}]).
|
|
|
|
%% @private
|
|
cast(Msg) -> gen_server:cast(?CM, Msg).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
TabOpts = [public, {write_concurrency, true}, {read_concurrency, true}],
|
|
ok = emqx_tables:new(?LWM2M_CHANNEL_TAB, [set, compressed | TabOpts]),
|
|
ok = emqx_tables:new(?LWM2M_CMD_TAB, [set, compressed | TabOpts]),
|
|
{ok, #{chan_pmon => emqx_pmon:new()}}.
|
|
|
|
handle_call(Req, _From, State) ->
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast({registered, {IMEI, ChanPid}}, State = #{chan_pmon := PMon}) ->
|
|
PMon1 = emqx_pmon:monitor(ChanPid, IMEI, PMon),
|
|
{noreply, State#{chan_pmon := PMon1}};
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
|
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
|
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
|
{noreply, State#{chan_pmon := PMon1}};
|
|
|
|
handle_info(Info, State) ->
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
emqx_stats:cancel_update(chan_stats).
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
clean_down({_ChanPid, IMEI}) ->
|
|
unregister_channel(IMEI).
|