574 lines
22 KiB
Erlang
574 lines
22 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_lwm2m_protocol).
|
|
|
|
-include("emqx_lwm2m.hrl").
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
%% API.
|
|
-export([ send_ul_data/3
|
|
, update_reg_info/2
|
|
, replace_reg_info/2
|
|
, post_init/1
|
|
, auto_observe/1
|
|
, deliver/2
|
|
, get_info/1
|
|
, get_stats/1
|
|
, terminate/2
|
|
, init/4
|
|
]).
|
|
|
|
%% For Mgmt
|
|
-export([ call/2
|
|
, call/3
|
|
]).
|
|
|
|
-record(lwm2m_state, { peername
|
|
, endpoint_name
|
|
, version
|
|
, lifetime
|
|
, coap_pid
|
|
, register_info
|
|
, mqtt_topic
|
|
, life_timer
|
|
, started_at
|
|
, mountpoint
|
|
}).
|
|
|
|
-define(DEFAULT_KEEP_ALIVE_DURATION, 60*2).
|
|
|
|
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
|
|
|
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => 0, is_new => true}).
|
|
|
|
-define(LOG(Level, Format, Args), logger:Level("LWM2M-PROTO: " ++ Format, Args)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
call(Pid, Msg) ->
|
|
call(Pid, Msg, 5000).
|
|
|
|
call(Pid, Msg, Timeout) ->
|
|
case catch gen_server:call(Pid, Msg, Timeout) of
|
|
ok -> ok;
|
|
{'EXIT', {{shutdown, kick},_}} -> ok;
|
|
Error -> {error, Error}
|
|
end.
|
|
|
|
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port},
|
|
RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
|
|
Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
|
|
Lwm2mState = #lwm2m_state{peername = Peername,
|
|
endpoint_name = EndpointName,
|
|
version = Ver,
|
|
lifetime = LifeTime,
|
|
coap_pid = CoapPid,
|
|
register_info = RegInfo,
|
|
mountpoint = Mountpoint},
|
|
ClientInfo = clientinfo(Lwm2mState),
|
|
_ = run_hooks('client.connect', [conninfo(Lwm2mState)], undefined),
|
|
case emqx_access_control:authenticate(ClientInfo) of
|
|
{ok, AuthResult} ->
|
|
_ = run_hooks('client.connack', [conninfo(Lwm2mState), success], undefined),
|
|
|
|
ClientInfo1 = maps:merge(ClientInfo, AuthResult),
|
|
Sockport = proplists:get_value(port, lwm2m_coap_responder:options(), 5683),
|
|
ClientInfo2 = maps:put(sockport, Sockport, ClientInfo1),
|
|
Lwm2mState1 = Lwm2mState#lwm2m_state{started_at = time_now(),
|
|
mountpoint = maps:get(mountpoint, ClientInfo2)},
|
|
run_hooks('client.connected', [ClientInfo2, conninfo(Lwm2mState1)]),
|
|
|
|
erlang:send(CoapPid, post_init),
|
|
erlang:send_after(2000, CoapPid, auto_observe),
|
|
|
|
_ = emqx_cm_locker:trans(EndpointName, fun(_) ->
|
|
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
|
|
end),
|
|
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
|
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
|
|
|
NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}),
|
|
{ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}};
|
|
{error, Error} ->
|
|
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
|
|
{error, Error}
|
|
end.
|
|
|
|
post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
|
|
register_info = RegInfo,
|
|
coap_pid = _CoapPid}) ->
|
|
%% - subscribe to the downlink_topic and wait for commands
|
|
Topic = downlink_topic(<<"register">>, Lwm2mState),
|
|
subscribe(Topic, Lwm2mState),
|
|
%% - report the registration info
|
|
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
|
|
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
|
|
|
|
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
|
|
coap_pid = CoapPid, endpoint_name = Epn}) ->
|
|
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
|
|
|
|
_ = case proplists:get_value(update_msg_publish_condition,
|
|
lwm2m_coap_responder:options(), contains_object_list) of
|
|
always ->
|
|
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
|
contains_object_list ->
|
|
%% - report the registration info update, but only when objectList is updated.
|
|
case NewRegInfo of
|
|
#{<<"objectList">> := _} ->
|
|
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
|
|
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
|
|
_ -> ok
|
|
end
|
|
end,
|
|
|
|
%% - flush cached donwlink commands
|
|
_ = flush_cached_downlink_messages(CoapPid),
|
|
|
|
%% - update the life timer
|
|
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
|
|
maps:get(<<"lt">>, UpdatedRegInfo), LifeTimer),
|
|
|
|
?LOG(debug, "Update RegInfo to: ~p", [UpdatedRegInfo]),
|
|
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
|
|
register_info = UpdatedRegInfo}.
|
|
|
|
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
|
|
coap_pid = CoapPid,
|
|
endpoint_name = EndpointName}) ->
|
|
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
|
|
|
|
%% - flush cached donwlink commands
|
|
_ = flush_cached_downlink_messages(CoapPid),
|
|
|
|
%% - update the life timer
|
|
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
|
|
maps:get(<<"lt">>, NewRegInfo), LifeTimer),
|
|
|
|
_ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
|
|
|
|
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
|
|
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
|
|
register_info = NewRegInfo}.
|
|
|
|
send_ul_data(_EventType, <<>>, _Lwm2mState) -> ok;
|
|
send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
|
|
_ = send_to_broker(EventType, Payload, Lwm2mState),
|
|
_ = flush_cached_downlink_messages(CoapPid),
|
|
Lwm2mState.
|
|
|
|
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
|
|
coap_pid = CoapPid,
|
|
endpoint_name = EndpointName}) ->
|
|
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
|
|
Lwm2mState.
|
|
|
|
deliver(#message{topic = Topic, payload = Payload},
|
|
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
|
|
register_info = RegInfo,
|
|
started_at = StartedAt,
|
|
endpoint_name = EndpointName}) ->
|
|
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
|
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, "
|
|
"Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
|
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
|
Lwm2mState.
|
|
|
|
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
|
|
started_at = StartedAt}) ->
|
|
ProtoInfo = [{peerhost, PeerHost}, {endpoint_name, EndpointName}, {started_at, StartedAt}],
|
|
{Stats, _} = get_stats(Lwm2mState),
|
|
{lists:append([ProtoInfo, Stats]), Lwm2mState}.
|
|
|
|
get_stats(Lwm2mState) ->
|
|
Stats = emqx_misc:proc_stats(),
|
|
{Stats, Lwm2mState}.
|
|
|
|
terminate(Reason, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, life_timer = LifeTimer,
|
|
mqtt_topic = SubTopic, endpoint_name = EndpointName}) ->
|
|
?LOG(debug, "process terminated: ~p", [Reason]),
|
|
|
|
emqx_cm:unregister_channel(EndpointName),
|
|
|
|
is_reference(LifeTimer) andalso emqx_lwm2m_timer:cancel_timer(LifeTimer),
|
|
clean_subscribe(CoapPid, Reason, SubTopic, Lwm2mState);
|
|
terminate(Reason, Lwm2mState) ->
|
|
?LOG(error, "process terminated: ~p, lwm2m_state: ~p", [Reason, Lwm2mState]).
|
|
|
|
clean_subscribe(_CoapPid, _Error, undefined, _Lwm2mState) -> ok;
|
|
clean_subscribe(CoapPid, {shutdown, Error}, SubTopic, Lwm2mState) ->
|
|
do_clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState);
|
|
clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState) ->
|
|
do_clean_subscribe(CoapPid, Error, SubTopic, Lwm2mState).
|
|
|
|
do_clean_subscribe(_CoapPid, Error, SubTopic, Lwm2mState) ->
|
|
?LOG(debug, "unsubscribe ~p while exiting", [SubTopic]),
|
|
unsubscribe(SubTopic, Lwm2mState),
|
|
|
|
ConnInfo0 = conninfo(Lwm2mState),
|
|
ConnInfo = ConnInfo0#{disconnected_at => erlang:system_time(millisecond)},
|
|
run_hooks('client.disconnected', [clientinfo(Lwm2mState), Error, ConnInfo]).
|
|
|
|
subscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = EndpointName}) ->
|
|
emqx_broker:subscribe(Topic, EndpointName, ?SUBOPTS),
|
|
emqx_hooks:run('session.subscribed', [clientinfo(Lwm2mState), Topic, ?SUBOPTS]).
|
|
|
|
unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
|
|
Opts = #{rh => 0, rap => 0, nl => 0, qos => 0},
|
|
emqx_broker:unsubscribe(Topic),
|
|
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
|
|
|
|
publish(Topic, Payload, Qos,
|
|
#lwm2m_state{
|
|
version = ProtoVer,
|
|
peername = {PeerHost, _},
|
|
endpoint_name = EndpointName}) ->
|
|
Message = emqx_message:set_flag(
|
|
retain, false,
|
|
emqx_message:make(EndpointName, Qos, Topic, Payload)
|
|
),
|
|
NMessage = emqx_message:set_headers(
|
|
#{proto_ver => ProtoVer,
|
|
protocol => lwm2m,
|
|
peerhost => PeerHost}, Message),
|
|
emqx_broker:publish(NMessage).
|
|
|
|
time_now() -> erlang:system_time(millisecond).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Deliver downlink message to coap
|
|
%%--------------------------------------------------------------------
|
|
|
|
deliver_to_coap(AlternatePath, JsonData,
|
|
CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
|
try
|
|
TermData = emqx_json:decode(JsonData, [return_maps]),
|
|
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
|
catch
|
|
C:R:Stack ->
|
|
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
|
|
[JsonData, {C, R}, Stack])
|
|
end;
|
|
|
|
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
|
|
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
|
|
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
|
|
MsgType = maps:get(<<"msgType">>, Ref),
|
|
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
|
|
case CacheMode of
|
|
false ->
|
|
do_deliver_to_coap(CoapPid, CoapRequest, Ref);
|
|
true ->
|
|
cache_downlink_message(CoapRequest, Ref)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Send uplink message to broker
|
|
%%--------------------------------------------------------------------
|
|
|
|
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
|
do_send_to_broker(EventType, Payload, Lwm2mState).
|
|
|
|
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload,
|
|
#lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
|
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
|
Code = maps:get(<<"code">>, Data, undefined),
|
|
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
|
Content = maps:get(<<"content">>, Data, undefined),
|
|
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
|
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
|
Topic = uplink_topic(EventType, Lwm2mState),
|
|
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Auto Observe
|
|
%%--------------------------------------------------------------------
|
|
|
|
auto_observe_object_list(true = _Expected, Registered) ->
|
|
Registered;
|
|
auto_observe_object_list(Expected, Registered) ->
|
|
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
|
|
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
|
|
|
|
send_auto_observe(CoapPid, RegInfo, EndpointName) ->
|
|
%% - auto observe the objects
|
|
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
|
|
false ->
|
|
?LOG(info, "Auto Observe Disabled", []);
|
|
TrueOrObjList ->
|
|
Objectlists = auto_observe_object_list(
|
|
TrueOrObjList,
|
|
maps:get(<<"objectList">>, RegInfo, [])
|
|
),
|
|
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
|
auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
|
|
end.
|
|
|
|
auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
|
?LOG(info, "Auto Observe on: ~p", [ObjectList]),
|
|
erlang:spawn(fun() ->
|
|
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
|
|
end).
|
|
|
|
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
|
lists:foreach(fun(ObjectPath) ->
|
|
[ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
|
|
case ObjId of
|
|
<<"19">> ->
|
|
[ObjInsId | _LastPath1] = LastPath,
|
|
case ObjInsId of
|
|
<<"0">> ->
|
|
observe_object_slowly(
|
|
AlternatePath, <<"/19/0/0">>,
|
|
CoapPid, 100, EndpointName
|
|
);
|
|
_ ->
|
|
observe_object_slowly(
|
|
AlternatePath, ObjectPath,
|
|
CoapPid, 100, EndpointName
|
|
)
|
|
end;
|
|
_ ->
|
|
observe_object_slowly(
|
|
AlternatePath, ObjectPath,
|
|
CoapPid, 100, EndpointName
|
|
)
|
|
end
|
|
end, ObjectList).
|
|
|
|
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
|
|
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
|
|
timer:sleep(Interval).
|
|
|
|
observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
|
|
Payload = #{
|
|
<<"msgType">> => <<"observe">>,
|
|
<<"data">> => #{
|
|
<<"path">> => ObjectPath
|
|
}
|
|
},
|
|
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
|
|
deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
|
|
|
|
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
|
|
erlang:spawn(fun() ->
|
|
lists:foreach(fun({CoapRequest, Ref}) ->
|
|
_ = do_deliver_to_coap(CoapPid, CoapRequest, Ref),
|
|
timer:sleep(Interval)
|
|
end, lists:reverse(CoapRequestList))
|
|
end).
|
|
|
|
do_deliver_to_coap(CoapPid, CoapRequest, Ref) ->
|
|
?LOG(debug, "Deliver To CoAP(~p), CoapRequest: ~p", [CoapPid, CoapRequest]),
|
|
CoapPid ! {deliver_to_coap, CoapRequest, Ref}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Queue Mode
|
|
%%--------------------------------------------------------------------
|
|
|
|
cache_downlink_message(CoapRequest, Ref) ->
|
|
?LOG(debug, "Cache downlink coap request: ~p, Ref: ~p", [CoapRequest, Ref]),
|
|
put(dl_msg_cache, [{CoapRequest, Ref} | get_cached_downlink_messages()]).
|
|
|
|
flush_cached_downlink_messages(CoapPid) ->
|
|
case erase(dl_msg_cache) of
|
|
CachedMessageList when is_list(CachedMessageList)->
|
|
do_deliver_to_coap_slowly(CoapPid, CachedMessageList, 100);
|
|
undefined -> ok
|
|
end.
|
|
|
|
get_cached_downlink_messages() ->
|
|
case get(dl_msg_cache) of
|
|
undefined -> [];
|
|
CachedMessageList -> CachedMessageList
|
|
end.
|
|
|
|
is_cache_mode(RegInfo, StartedAt) ->
|
|
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
|
|
true ->
|
|
QModeTimeWind = proplists:get_value(
|
|
qmode_time_window,
|
|
lwm2m_coap_responder:options(),
|
|
22
|
|
),
|
|
(time_now() - StartedAt) >= QModeTimeWind;
|
|
false -> false
|
|
end.
|
|
|
|
is_psm(_) -> false.
|
|
|
|
is_qmode(#{<<"b">> := Binding}) when Binding =:= <<"UQ">>;
|
|
Binding =:= <<"SQ">>;
|
|
Binding =:= <<"UQS">>
|
|
-> true;
|
|
is_qmode(_) -> false.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Construct downlink and uplink topics
|
|
%%--------------------------------------------------------------------
|
|
|
|
downlink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
|
|
Topics = proplists:get_value(topics, lwm2m_coap_responder:options(), []),
|
|
DnTopic = proplists:get_value(downlink_topic_key(EventType), Topics,
|
|
default_downlink_topic(EventType)),
|
|
take_place(mountpoint(iolist_to_binary(DnTopic), Mountpoint), Lwm2mState).
|
|
|
|
uplink_topic(EventType, Lwm2mState = #lwm2m_state{mountpoint = Mountpoint}) ->
|
|
Topics = proplists:get_value(topics, lwm2m_coap_responder:options(), []),
|
|
UpTopic = proplists:get_value(uplink_topic_key(EventType), Topics,
|
|
default_uplink_topic(EventType)),
|
|
take_place(mountpoint(iolist_to_binary(UpTopic), Mountpoint), Lwm2mState).
|
|
|
|
downlink_topic_key(EventType) when is_binary(EventType) ->
|
|
command.
|
|
|
|
uplink_topic_key(<<"notify">>) -> notify;
|
|
uplink_topic_key(<<"register">>) -> register;
|
|
uplink_topic_key(<<"update">>) -> update;
|
|
uplink_topic_key(EventType) when is_binary(EventType) ->
|
|
response.
|
|
|
|
default_downlink_topic(Type) when is_binary(Type)->
|
|
<<"dn/#">>.
|
|
|
|
default_uplink_topic(<<"notify">>) ->
|
|
<<"up/notify">>;
|
|
default_uplink_topic(Type) when is_binary(Type) ->
|
|
<<"up/resp">>.
|
|
|
|
take_place(Text, Lwm2mState) ->
|
|
{IPAddr, _} = Lwm2mState#lwm2m_state.peername,
|
|
IPAddrBin = iolist_to_binary(inet:ntoa(IPAddr)),
|
|
take_place(take_place(Text, <<"%a">>, IPAddrBin),
|
|
<<"%e">>, Lwm2mState#lwm2m_state.endpoint_name).
|
|
|
|
take_place(Text, Placeholder, Value) ->
|
|
binary:replace(Text, Placeholder, Value, [global]).
|
|
|
|
clientinfo(#lwm2m_state{peername = {PeerHost, _},
|
|
endpoint_name = EndpointName,
|
|
mountpoint = Mountpoint}) ->
|
|
#{zone => undefined,
|
|
protocol => lwm2m,
|
|
peerhost => PeerHost,
|
|
sockport => 5683, %% FIXME:
|
|
clientid => EndpointName,
|
|
username => undefined,
|
|
password => undefined,
|
|
peercert => nossl,
|
|
is_bridge => false,
|
|
is_superuser => false,
|
|
mountpoint => Mountpoint,
|
|
ws_cookie => undefined
|
|
}.
|
|
|
|
mountpoint(Topic, <<>>) ->
|
|
Topic;
|
|
mountpoint(Topic, Mountpoint) ->
|
|
<<Mountpoint/binary, Topic/binary>>.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper funcs
|
|
|
|
-compile({inline, [run_hooks/2, run_hooks/3]}).
|
|
run_hooks(Name, Args) ->
|
|
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
|
|
|
run_hooks(Name, Args, Acc) ->
|
|
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Info & Stats
|
|
|
|
info(State) ->
|
|
ChannInfo = chann_info(State),
|
|
ChannInfo#{sockinfo => sockinfo(State)}.
|
|
|
|
%% copies from emqx_connection:info/1
|
|
sockinfo(#lwm2m_state{peername = Peername}) ->
|
|
#{socktype => udp,
|
|
peername => Peername,
|
|
sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock?
|
|
sockstate => running,
|
|
active_n => 1
|
|
}.
|
|
|
|
%% copies from emqx_channel:info/1
|
|
chann_info(State) ->
|
|
#{conninfo => conninfo(State),
|
|
conn_state => connected,
|
|
clientinfo => clientinfo(State),
|
|
session => maps:from_list(session_info(State)),
|
|
will_msg => undefined
|
|
}.
|
|
|
|
conninfo(#lwm2m_state{peername = Peername,
|
|
version = Ver,
|
|
started_at = StartedAt,
|
|
endpoint_name = Epn}) ->
|
|
#{socktype => udp,
|
|
sockname => {{127,0,0,1}, 5683},
|
|
peername => Peername,
|
|
peercert => nossl, %% TODO: dtls
|
|
conn_mod => ?MODULE,
|
|
proto_name => <<"LwM2M">>,
|
|
proto_ver => Ver,
|
|
clean_start => true,
|
|
clientid => Epn,
|
|
username => undefined,
|
|
conn_props => undefined,
|
|
connected => true,
|
|
connected_at => StartedAt,
|
|
keepalive => 0,
|
|
receive_maximum => 0,
|
|
expiry_interval => 0
|
|
}.
|
|
|
|
%% copies from emqx_session:info/1
|
|
session_info(#lwm2m_state{mqtt_topic = SubTopic, started_at = StartedAt}) ->
|
|
[{subscriptions, #{SubTopic => ?SUBOPTS}},
|
|
{upgrade_qos, false},
|
|
{retry_interval, 0},
|
|
{await_rel_timeout, 0},
|
|
{created_at, StartedAt}
|
|
].
|
|
|
|
%% The stats keys copied from emqx_connection:stats/1
|
|
stats(_State) ->
|
|
SockStats = [{recv_oct,0}, {recv_cnt,0}, {send_oct,0}, {send_cnt,0}, {send_pend,0}],
|
|
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
|
ChanStats = [{subscriptions_cnt, 1},
|
|
{subscriptions_max, 1},
|
|
{inflight_cnt, 0},
|
|
{inflight_max, 0},
|
|
{mqueue_len, 0},
|
|
{mqueue_max, 0},
|
|
{mqueue_dropped, 0},
|
|
{next_pkt_id, 0},
|
|
{awaiting_rel_cnt, 0},
|
|
{awaiting_rel_max, 0}
|
|
],
|
|
ProcStats = emqx_misc:proc_stats(),
|
|
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
|
|