feat(lwm2m): fix check dialyzer fail

This commit is contained in:
Turtle 2021-06-23 16:23:31 +08:00 committed by turtleDeng
parent a6bd1c90d5
commit 2092bedb12
2 changed files with 50 additions and 25 deletions

View File

@ -23,6 +23,7 @@
-export([ mqtt2coap/2 -export([ mqtt2coap/2
, coap2mqtt/4 , coap2mqtt/4
, ack2mqtt/1 , ack2mqtt/1
, extract_path/1
]). ]).
-export([path_list/1]). -export([path_list/1]).

View File

@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
end), end),
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
{error, Error} -> {error, Error} ->
@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}. Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
life_timer = LifeTimer, register_info = RegInfo, coap_pid = CoapPid, endpoint_name = Epn}) ->
coap_pid = CoapPid}) ->
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo), UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
_ = case proplists:get_value(update_msg_publish_condition, _ = case proplists:get_value(update_msg_publish_condition,
@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
%% - report the registration info update, but only when objectList is updated. %% - report the registration info update, but only when objectList is updated.
case NewRegInfo of case NewRegInfo of
#{<<"objectList">> := _} -> #{<<"objectList">> := _} ->
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok _ -> ok
end end
@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
register_info = UpdatedRegInfo}. register_info = UpdatedRegInfo}.
replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
coap_pid = CoapPid}) -> coap_pid = CoapPid,
endpoint_name = EndpointName}) ->
_ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState), _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState),
%% - flush cached donwlink commands %% - flush cached donwlink commands
@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer,
UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer(
maps:get(<<"lt">>, NewRegInfo), LifeTimer), maps:get(<<"lt">>, NewRegInfo), LifeTimer),
_ = send_auto_observe(CoapPid, NewRegInfo), _ = send_auto_observe(CoapPid, NewRegInfo, EndpointName),
?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]), ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]),
Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer, Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer,
@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) ->
Lwm2mState. Lwm2mState.
auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo, auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo,
coap_pid = CoapPid}) -> coap_pid = CoapPid,
_ = send_auto_observe(CoapPid, RegInfo), endpoint_name = EndpointName}) ->
_ = send_auto_observe(CoapPid, RegInfo, EndpointName),
Lwm2mState. Lwm2mState.
deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) -> deliver(#message{topic = Topic, payload = Payload},
Lwm2mState = #lwm2m_state{coap_pid = CoapPid,
register_info = RegInfo,
started_at = StartedAt,
endpoint_name = EndpointName}) ->
IsCacheMode = is_cache_mode(RegInfo, StartedAt), IsCacheMode = is_cache_mode(RegInfo, StartedAt),
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode), deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
Lwm2mState. Lwm2mState.
get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _}, get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _},
@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond).
%% Deliver downlink message to coap %% Deliver downlink message to coap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)-> deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
try try
TermData = emqx_json:decode(JsonData, [return_maps]), TermData = emqx_json:decode(JsonData, [return_maps]),
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
catch catch
C:R:Stack -> C:R:Stack ->
?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p", ?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p",
[JsonData, {C, R}, Stack]) [JsonData, {C, R}, Stack])
end; end;
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) -> deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) ->
?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]), ?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]),
{CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData), {CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData),
MsgType = maps:get(<<"msgType">>, Ref),
emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType),
case CacheMode of case CacheMode of
false -> false ->
do_deliver_to_coap(CoapPid, CoapRequest, Ref); do_deliver_to_coap(CoapPid, CoapRequest, Ref);
@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat
send_to_broker(EventType, Payload = #{}, Lwm2mState) -> send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
do_send_to_broker(EventType, Payload, Lwm2mState). do_send_to_broker(EventType, Payload, Lwm2mState).
do_send_to_broker(EventType, Payload, Lwm2mState) -> do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
Code = maps:get(<<"code">>, Data, undefined),
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
Content = maps:get(<<"content">>, Data, undefined),
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
NewPayload = maps:put(<<"msgType">>, EventType, Payload), NewPayload = maps:put(<<"msgType">>, EventType, Payload),
Topic = uplink_topic(EventType, Lwm2mState), Topic = uplink_topic(EventType, Lwm2mState),
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name). publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) ->
Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected), Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected),
lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered). lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered).
send_auto_observe(CoapPid, RegInfo) -> send_auto_observe(CoapPid, RegInfo, EndpointName) ->
%% - auto observe the objects %% - auto observe the objects
case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of
false -> false ->
@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) ->
maps:get(<<"objectList">>, RegInfo, []) maps:get(<<"objectList">>, RegInfo, [])
), ),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
auto_observe(AlternatePath, Objectlists, CoapPid) auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName)
end. end.
auto_observe(AlternatePath, ObjectList, CoapPid) -> auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
?LOG(info, "Auto Observe on: ~p", [ObjectList]), ?LOG(info, "Auto Observe on: ~p", [ObjectList]),
erlang:spawn(fun() -> erlang:spawn(fun() ->
observe_object_list(AlternatePath, ObjectList, CoapPid) observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName)
end). end).
observe_object_list(AlternatePath, ObjectList, CoapPid) -> observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
lists:foreach(fun(ObjectPath) -> lists:foreach(fun(ObjectPath) ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100) [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
case ObjId of
<<"19">> ->
[ObjInsId | _LastPath1] = LastPath,
case ObjInsId of
<<"0">> ->
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end;
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
end
end, ObjectList). end, ObjectList).
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) -> observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) ->
observe_object(AlternatePath, ObjectPath, CoapPid), observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName),
timer:sleep(Interval). timer:sleep(Interval).
observe_object(AlternatePath, ObjectPath, CoapPid) -> observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) ->
Payload = #{ Payload = #{
<<"msgType">> => <<"observe">>, <<"msgType">> => <<"observe">>,
<<"data">> => #{ <<"data">> => #{
@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) ->
} }
}, },
?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]), ?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]),
deliver_to_coap(AlternatePath, Payload, CoapPid, false). deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName).
do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) -> do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) ->
erlang:spawn(fun() -> erlang:spawn(fun() ->