From 5682dcb72ec1e5fd50db69059c958d14ff3b8d85 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 21 Mar 2022 12:01:22 +0800 Subject: [PATCH] fix(gw): enrich conninfo for coap&lwm2m --- .../src/coap/emqx_coap_channel.erl | 32 ++++++++++++++----- .../src/lwm2m/emqx_lwm2m_channel.erl | 28 +++++++++++++--- .../emqx_rule_engine/src/emqx_rule_events.erl | 2 +- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index cc5732ee2..7fa948a47 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -119,10 +119,10 @@ stats(_) -> []. -spec init(map(), map()) -> channel(). -init(ConnInfoT = #{peername := {PeerHost, _}, - sockname := {_, SockPort}}, +init(ConnInfo = #{peername := {PeerHost, _}, + sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> - Peercert = maps:get(peercert, ConnInfoT, undefined), + Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, <<>>), ListenerId = case maps:get(listener, Config, undefined) of undefined -> undefined; @@ -144,10 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - %% because it is possible to disconnect after init, and then trigger the - %% $event.disconnected hook and these two fields are required in the hook - ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>}, - Heartbeat = ?GET_IDLE_TIME(Config), #channel{ ctx = Ctx , conninfo = ConnInfo @@ -405,6 +401,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, {ok, Input, Channel} end. +enrich_conninfo({Queries, _Msg}, + Channel = #channel{ + keepalive = KeepAlive, + conninfo = ConnInfo}) -> + case Queries of + #{<<"clientid">> := ClientId} -> + Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), + NConnInfo = ConnInfo#{ clientid => ClientId + , proto_name => <<"CoAP">> + , proto_ver => <<"1">> + , clean_start => true + , keepalive => Interval + , expiry_interval => 0 + }, + {ok, Channel#channel{conninfo = NConnInfo}}; + _ -> + {error, "invalid queries", Channel} + end. + enrich_clientinfo({Queries, Msg}, Channel = #channel{clientinfo = ClientInfo0}) -> case Queries of @@ -583,7 +598,8 @@ process_nothing(_, _, Channel) -> process_connection({open, Req}, Result, Channel, Iter) -> Queries = emqx_coap_message:get_option(uri_query, Req), case emqx_misc:pipeline( - [ fun run_conn_hooks/2 + [ fun enrich_conninfo/2 + , fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 , fun auth_connect/2 diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index c53ca0528..0ff3dad27 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -108,10 +108,10 @@ info(ctx, #channel{ctx = Ctx}) -> stats(_) -> []. -init(ConnInfoT = #{peername := {PeerHost, _}, - sockname := {_, SockPort}}, +init(ConnInfo = #{peername := {PeerHost, _}, + sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> - Peercert = maps:get(peercert, ConnInfoT, undefined), + Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, undefined), ListenerId = case maps:get(listener, Config, undefined) of undefined -> undefined; @@ -133,8 +133,6 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - ConnInfo = ConnInfoT#{proto_name => <<"LwM2M">>, proto_ver => <<"0.0">>}, - #channel{ ctx = Ctx , conninfo = ConnInfo , clientinfo = ClientInfo @@ -369,6 +367,7 @@ do_takeover(_DesireId, Msg, Channel) -> do_connect(Req, Result, Channel, Iter) -> case emqx_misc:pipeline( [ fun check_lwm2m_version/2 + , fun enrich_conninfo/2 , fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 @@ -427,6 +426,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, {ok, Input, Channel} end. +enrich_conninfo(#coap_message{options = Options}, + Channel = #channel{ + conninfo = ConnInfo}) -> + Query = maps:get(uri_query, Options, #{}), + case Query of + #{<<"ep">> := Epn, <<"lt">> := Lifetime} -> + ClientId = maps:get(<<"device_id">>, Query, Epn), + NConnInfo = ConnInfo#{ clientid => ClientId + , proto_name => <<"LwM2M">> + , proto_ver => <<"1.0.1">> + , clean_start => true + , keepalive => binary_to_integer(Lifetime) + , expiry_interval => 0 + }, + {ok, Channel#channel{conninfo = NConnInfo}}; + _ -> + {error, "invalid queries", Channel} + end. + enrich_clientinfo(#coap_message{options = Options} = Msg, Channel = #channel{clientinfo = ClientInfo0}) -> Query = maps:get(uri_query, Options, #{}), diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index de3af52ac..2cd43e691 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -185,7 +185,7 @@ eventmsg_connected(_ClientInfo = #{ is_bridge := IsBridge, mountpoint := Mountpoint }, - _ConnInfo = #{ + ConnInfo = #{ peername := PeerName, sockname := SockName, clean_start := CleanStart,