From 2092bedb121597f5494539fc060ff07063092ac2 Mon Sep 17 00:00:00 2001 From: Turtle Date: Wed, 23 Jun 2021 16:23:31 +0800 Subject: [PATCH] feat(lwm2m): fix check dialyzer fail --- .../emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl | 1 + apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 74 ++++++++++++------- 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl index 2d208bdb4..b3251a275 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_cmd_handler.erl @@ -23,6 +23,7 @@ -export([ mqtt2coap/2 , coap2mqtt/4 , ack2mqtt/1 + , extract_path/1 ]). -export([path_list/1]). diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 55f992da6..34c72dcca 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -103,6 +103,7 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) end), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), + emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; {error, Error} -> @@ -120,10 +121,8 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName, _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState), Lwm2mState#lwm2m_state{mqtt_topic = Topic}. -update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ - life_timer = LifeTimer, register_info = RegInfo, - coap_pid = CoapPid}) -> - +update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo, + coap_pid = CoapPid, endpoint_name = Epn}) -> UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo), _ = case proplists:get_value(update_msg_publish_condition, @@ -134,6 +133,7 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ %% - report the registration info update, but only when objectList is updated. case NewRegInfo of #{<<"objectList">> := _} -> + emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); _ -> ok end @@ -151,7 +151,8 @@ update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ register_info = UpdatedRegInfo}. replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, - coap_pid = CoapPid}) -> + coap_pid = CoapPid, + endpoint_name = EndpointName}) -> _ = send_to_broker(<<"register">>, #{<<"data">> => NewRegInfo}, Lwm2mState), %% - flush cached donwlink commands @@ -161,7 +162,7 @@ replace_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, UpdatedLifeTimer = emqx_lwm2m_timer:refresh_timer( maps:get(<<"lt">>, NewRegInfo), LifeTimer), - _ = send_auto_observe(CoapPid, NewRegInfo), + _ = send_auto_observe(CoapPid, NewRegInfo, EndpointName), ?LOG(debug, "Replace RegInfo to: ~p", [NewRegInfo]), Lwm2mState#lwm2m_state{life_timer = UpdatedLifeTimer, @@ -174,15 +175,20 @@ send_ul_data(EventType, Payload, Lwm2mState=#lwm2m_state{coap_pid = CoapPid}) -> Lwm2mState. auto_observe(Lwm2mState = #lwm2m_state{register_info = RegInfo, - coap_pid = CoapPid}) -> - _ = send_auto_observe(CoapPid, RegInfo), + coap_pid = CoapPid, + endpoint_name = EndpointName}) -> + _ = send_auto_observe(CoapPid, RegInfo, EndpointName), Lwm2mState. -deliver(#message{topic = Topic, payload = Payload}, Lwm2mState = #lwm2m_state{coap_pid = CoapPid, register_info = RegInfo, started_at = StartedAt}) -> +deliver(#message{topic = Topic, payload = Payload}, + Lwm2mState = #lwm2m_state{coap_pid = CoapPid, + register_info = RegInfo, + started_at = StartedAt, + endpoint_name = EndpointName}) -> IsCacheMode = is_cache_mode(RegInfo, StartedAt), ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), - deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode), + deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName), Lwm2mState. get_info(Lwm2mState = #lwm2m_state{endpoint_name = EndpointName, peername = {PeerHost, _}, @@ -238,20 +244,21 @@ time_now() -> erlang:system_time(millisecond). %% Deliver downlink message to coap %%-------------------------------------------------------------------- -deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode) when is_binary(JsonData)-> +deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> try TermData = emqx_json:decode(JsonData, [return_maps]), - deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) + deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) catch C:R:Stack -> ?LOG(error, "deliver_to_coap - Invalid JSON: ~p, Exception: ~p, stacktrace: ~p", [JsonData, {C, R}, Stack]) end; -deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermData) -> +deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when is_map(TermData) -> ?LOG(info, "SEND To CoAP, AlternatePath=~p, Data=~p", [AlternatePath, TermData]), {CoapRequest, Ref} = emqx_lwm2m_cmd_handler:mqtt2coap(AlternatePath, TermData), - + MsgType = maps:get(<<"msgType">>, Ref), + emqx_lwm2m_cm:register_cmd(EndpointName, emqx_lwm2m_cmd_handler:extract_path(Ref), MsgType), case CacheMode of false -> do_deliver_to_coap(CoapPid, CoapRequest, Ref); @@ -266,7 +273,12 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode) when is_map(TermDat send_to_broker(EventType, Payload = #{}, Lwm2mState) -> do_send_to_broker(EventType, Payload, Lwm2mState). -do_send_to_broker(EventType, Payload, Lwm2mState) -> +do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> + ReqPath = maps:get(<<"reqPath">>, Data, undefined), + Code = maps:get(<<"code">>, Data, undefined), + CodeMsg = maps:get(<<"codeMsg">>, Data, undefined), + Content = maps:get(<<"content">>, Data, undefined), + emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}), NewPayload = maps:put(<<"msgType">>, EventType, Payload), Topic = uplink_topic(EventType, Lwm2mState), publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name). @@ -281,7 +293,7 @@ auto_observe_object_list(Expected, Registered) -> Expected1 = lists:map(fun(S) -> iolist_to_binary(S) end, Expected), lists:filter(fun(S) -> lists:member(S, Expected1) end, Registered). -send_auto_observe(CoapPid, RegInfo) -> +send_auto_observe(CoapPid, RegInfo, EndpointName) -> %% - auto observe the objects case proplists:get_value(auto_observe, lwm2m_coap_responder:options(), false) of false -> @@ -292,25 +304,37 @@ send_auto_observe(CoapPid, RegInfo) -> maps:get(<<"objectList">>, RegInfo, []) ), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), - auto_observe(AlternatePath, Objectlists, CoapPid) + auto_observe(AlternatePath, Objectlists, CoapPid, EndpointName) end. -auto_observe(AlternatePath, ObjectList, CoapPid) -> +auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) -> ?LOG(info, "Auto Observe on: ~p", [ObjectList]), erlang:spawn(fun() -> - observe_object_list(AlternatePath, ObjectList, CoapPid) + observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) end). -observe_object_list(AlternatePath, ObjectList, CoapPid) -> +observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) -> lists:foreach(fun(ObjectPath) -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100) + [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), + case ObjId of + <<"19">> -> + [ObjInsId | _LastPath1] = LastPath, + case ObjInsId of + <<"0">> -> + observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName); + _ -> + observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + end; + _ -> + observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + end end, ObjectList). -observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval) -> - observe_object(AlternatePath, ObjectPath, CoapPid), +observe_object_slowly(AlternatePath, ObjectPath, CoapPid, Interval, EndpointName) -> + observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName), timer:sleep(Interval). -observe_object(AlternatePath, ObjectPath, CoapPid) -> +observe_object(AlternatePath, ObjectPath, CoapPid, EndpointName) -> Payload = #{ <<"msgType">> => <<"observe">>, <<"data">> => #{ @@ -318,7 +342,7 @@ observe_object(AlternatePath, ObjectPath, CoapPid) -> } }, ?LOG(info, "Observe ObjectPath: ~p", [ObjectPath]), - deliver_to_coap(AlternatePath, Payload, CoapPid, false). + deliver_to_coap(AlternatePath, Payload, CoapPid, false, EndpointName). do_deliver_to_coap_slowly(CoapPid, CoapRequestList, Interval) -> erlang:spawn(fun() ->