diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index ec7c5b7a9..1e68d74d6 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -235,7 +235,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> -spec init(emqx_types:conninfo(), opts()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, PeerPort}, + peername := {PeerHost, PeerPort} = PeerName, sockname := {_Host, SockPort} }, #{ @@ -259,6 +259,9 @@ init( listener => ListenerId, protocol => Protocol, peerhost => PeerHost, + %% We copy peername to clientinfo because some event contexts only have access + %% to client info (e.g.: authn/authz). + peername => PeerName, peerport => PeerPort, sockport => SockPort, clientid => undefined, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index 20ee129f3..ac8db26db 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -308,6 +308,7 @@ clientinfo(OldClientInfo) -> zone, protocol, peerhost, + peername, sockport, clientid, username, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index b2aac2a4c..06ffdd1c9 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -120,7 +120,7 @@ stats(#channel{session = Session}) -> -spec init(map(), map()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, _}, + peername := {PeerHost, _} = PeerName, sockname := {_, SockPort} }, #{ctx := Ctx} = Config @@ -140,6 +140,7 @@ init( listener => ListenerId, protocol => 'coap', peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => emqx_guid:to_base62(emqx_guid:gen()), username => undefined, diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 93646acbf..4847fabb3 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -802,7 +802,7 @@ default_conninfo(ConnInfo) -> }. default_clientinfo(#{ - peername := {PeerHost, _}, + peername := {PeerHost, _} = PeerName, sockname := {_, SockPort}, clientid := ClientId }) -> @@ -810,6 +810,7 @@ default_clientinfo(#{ zone => default, protocol => exproto, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => ClientId, username => undefined, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 809a79f7d..88babdb30 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -138,7 +138,7 @@ set_conn_state(ConnState, Channel) -> init( ConnInfo = #{ - peername := {PeerHost, _Port}, + peername := {PeerHost, _Port} = PeerName, sockname := {_Host, SockPort} }, Options @@ -160,6 +160,7 @@ init( listener => ListenerId, protocol => gbt32960, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index a74214a1c..588da31ff 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -147,7 +147,7 @@ stats(#channel{inflight = Inflight, mqueue = Queue}) -> -spec init(emqx_types:conninfo(), map()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, _Port}, + peername := {PeerHost, _Port} = PeerName, sockname := {_Host, SockPort} }, Options = #{ @@ -171,6 +171,7 @@ init( listener => ListenerId, protocol => jt808, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src index 66b2db041..811b3c8a7 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src +++ b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_lwm2m, [ {description, "LwM2M Gateway"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]}, {env, []}, diff --git a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl index 595041c53..31497c9f1 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl @@ -119,7 +119,7 @@ stats(#channel{session = Session}) -> init( ConnInfo = #{ - peername := {PeerHost, _}, + peername := {PeerHost, _} = PeerName, sockname := {_, SockPort} }, #{ctx := Ctx} = Config @@ -139,6 +139,7 @@ init( listener => ListenerId, protocol => lwm2m, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, username => undefined, clientid => undefined, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index c9e109c3f..bb2d5f332 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -130,7 +130,7 @@ %% @doc Init protocol init( ConnInfo = #{ - peername := {PeerHost, _}, + peername := {PeerHost, _} = PeerName, sockname := {_, SockPort} }, Option @@ -152,6 +152,7 @@ init( listener => ListenerId, protocol => 'mqtt-sn', peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index 8473c9978..f9c791613 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -209,7 +209,7 @@ stats(#channel{mqueue = MQueue}) -> -spec init(emqx_types:conninfo(), map()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, _Port}, + peername := {PeerHost, _Port} = PeerName, sockname := {_Host, SockPort} }, Options @@ -230,6 +230,7 @@ init( listener => ListenerId, protocol => ocpp, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => undefined, username => undefined, @@ -325,9 +326,9 @@ enrich_client( set_log_meta(#channel{ clientinfo = #{clientid := ClientId}, - conninfo = #{peername := Peername} + conninfo = #{peername := PeerName} }) -> - emqx_logger:set_metadata_peername(esockd:format(Peername)), + emqx_logger:set_metadata_peername(esockd:format(PeerName)), emqx_logger:set_metadata_clientid(ClientId). run_conn_hooks(_UserInfo, Channel = #channel{conninfo = ConnInfo}) -> diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 4b7f2b06a..1fb0a498f 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -117,7 +117,7 @@ %% @doc Init protocol init( ConnInfo = #{ - peername := {PeerHost, _}, + peername := {PeerHost, _} = PeerName, sockname := {_, SockPort} }, Option @@ -137,6 +137,7 @@ init( listener => ListenerId, protocol => stomp, peerhost => PeerHost, + peername => PeerName, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a6ca2f7be..a51f2bed5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -329,6 +329,7 @@ eventmsg_publish( username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), + peername => ntoa(emqx_message:get_header(peername, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, @@ -446,7 +447,8 @@ eventmsg_check_authz_complete( _ClientInfo = #{ clientid := ClientId, username := Username, - peerhost := PeerHost + peerhost := PeerHost, + peername := PeerName }, PubSub, Topic, @@ -458,6 +460,7 @@ eventmsg_check_authz_complete( #{ clientid => ClientId, username => Username, + peername => ntoa(PeerName), peerhost => ntoa(PeerHost), topic => Topic, action => PubSub, @@ -471,8 +474,7 @@ eventmsg_check_authn_complete( _ClientInfo = #{ clientid := ClientId, username := Username, - peerhost := PeerHost, - peerport := PeerPort + peername := PeerName }, Result ) -> @@ -488,7 +490,7 @@ eventmsg_check_authn_complete( #{ clientid => ClientId, username => Username, - peername => ntoa({PeerHost, PeerPort}), + peername => ntoa(PeerName), reason_code => force_to_bin(Reason), is_anonymous => IsAnonymous, is_superuser => IsSuperuser @@ -501,7 +503,8 @@ eventmsg_sub_or_unsub( _ClientInfo = #{ clientid := ClientId, username := Username, - peerhost := PeerHost + peerhost := PeerHost, + peername := PeerName }, Topic, SubOpts = #{qos := QoS} @@ -513,6 +516,7 @@ eventmsg_sub_or_unsub( clientid => ClientId, username => Username, peerhost => ntoa(PeerHost), + peername => ntoa(PeerName), PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})), topic => Topic, qos => QoS @@ -542,6 +546,7 @@ eventmsg_dropped( username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), + peername => ntoa(emqx_message:get_header(peername, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, @@ -606,6 +611,7 @@ eventmsg_validation_failed( username => emqx_message:get_header(username, Message, undefined), payload => Payload, peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), + peername => ntoa(emqx_message:get_header(peername, Message, undefined)), topic => Topic, qos => QoS, flags => Flags, @@ -618,6 +624,7 @@ eventmsg_validation_failed( eventmsg_delivered( _ClientInfo = #{ peerhost := PeerHost, + peername := PeerName, clientid := ReceiverCId, username := ReceiverUsername }, @@ -642,6 +649,7 @@ eventmsg_delivered( username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), + peername => ntoa(PeerName), topic => Topic, qos => QoS, flags => Flags, @@ -654,6 +662,7 @@ eventmsg_delivered( eventmsg_acked( _ClientInfo = #{ peerhost := PeerHost, + peername := PeerName, clientid := ReceiverCId, username := ReceiverUsername }, @@ -678,6 +687,7 @@ eventmsg_acked( username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), + peername => ntoa(PeerName), topic => Topic, qos => QoS, flags => Flags, @@ -691,6 +701,7 @@ eventmsg_acked( eventmsg_delivery_dropped( _ClientInfo = #{ peerhost := PeerHost, + peername := PeerName, clientid := ReceiverCId, username := ReceiverUsername }, @@ -717,6 +728,7 @@ eventmsg_delivery_dropped( username => ReceiverUsername, payload => Payload, peerhost => ntoa(PeerHost), + peername => ntoa(PeerName), topic => Topic, qos => QoS, flags => Flags, @@ -1009,6 +1021,7 @@ columns_with_exam('message.publish') -> {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, @@ -1031,6 +1044,7 @@ columns_with_exam('message.dropped') -> {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, @@ -1048,6 +1062,7 @@ columns_with_exam('schema.validation_failed') -> {<<"username">>, <<"u_emqx">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, @@ -1084,6 +1099,7 @@ columns_with_exam('delivery.dropped') -> {<<"username">>, <<"u_emqx_2">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, @@ -1150,6 +1166,7 @@ columns_with_exam('client.check_authz_complete') -> {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"action">>, <<"publish">>}, {<<"authz_source">>, <<"cache">>}, @@ -1197,6 +1214,7 @@ columns_message_sub_unsub(EventName) -> {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"timestamp">>, erlang:system_time(millisecond)}, @@ -1213,6 +1231,7 @@ columns_message_ack_delivered(EventName) -> {<<"username">>, <<"u_emqx_2">>}, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:32781">>}, {<<"topic">>, <<"t/a">>}, {<<"qos">>, 1}, {<<"flags">>, #{}}, diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 4dca243d1..3d33c1a98 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -3920,6 +3920,7 @@ verify_event_fields('message.publish', Fields) -> username := Username, payload := Payload, peerhost := PeerHost, + peername := PeerName, topic := Topic, qos := QoS, flags := Flags, @@ -3934,6 +3935,7 @@ verify_event_fields('message.publish', Fields) -> ?assertEqual(<<"c_event">>, ClientId), ?assertEqual(<<"u_event">>, Username), ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4008,6 +4010,7 @@ verify_event_fields(SubUnsub, Fields) when clientid := ClientId, username := Username, peerhost := PeerHost, + peername := PeerName, topic := Topic, qos := QoS, timestamp := Timestamp @@ -4017,6 +4020,7 @@ verify_event_fields(SubUnsub, Fields) when ?assert(is_atom(reason)), ?assertEqual(<<"c_event2">>, ClientId), ?assertEqual(<<"u_event2">>, Username), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4043,6 +4047,7 @@ verify_event_fields('delivery.dropped', Fields) -> node := Node, payload := Payload, peerhost := PeerHost, + peername := PeerName, pub_props := Properties, publish_received_at := EventAt, qos := QoS, @@ -4062,6 +4067,7 @@ verify_event_fields('delivery.dropped', Fields) -> ?assertEqual(<<"c_event">>, FromClientId), ?assertEqual(<<"u_event">>, FromUsername), ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4078,6 +4084,7 @@ verify_event_fields('message.dropped', Fields) -> username := Username, payload := Payload, peerhost := PeerHost, + peername := PeerName, topic := Topic, qos := QoS, flags := Flags, @@ -4093,6 +4100,7 @@ verify_event_fields('message.dropped', Fields) -> ?assertEqual(<<"c_event">>, ClientId), ?assertEqual(<<"u_event">>, Username), ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4110,6 +4118,7 @@ verify_event_fields('message.delivered', Fields) -> from_username := FromUsername, payload := Payload, peerhost := PeerHost, + peername := PeerName, topic := Topic, qos := QoS, flags := Flags, @@ -4126,6 +4135,7 @@ verify_event_fields('message.delivered', Fields) -> ?assertEqual(<<"c_event">>, FromClientId), ?assertEqual(<<"u_event">>, FromUsername), ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4143,6 +4153,7 @@ verify_event_fields('message.acked', Fields) -> from_username := FromUsername, payload := Payload, peerhost := PeerHost, + peername := PeerName, topic := Topic, qos := QoS, flags := Flags, @@ -4160,6 +4171,7 @@ verify_event_fields('message.acked', Fields) -> ?assertEqual(<<"c_event">>, FromClientId), ?assertEqual(<<"u_event">>, FromUsername), ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_peername(PeerName), verify_ipaddr(PeerHost), ?assertEqual(<<"t1">>, Topic), ?assertEqual(1, QoS), @@ -4203,6 +4215,7 @@ verify_event_fields('client.check_authz_complete', Fields) -> clientid := ClientId, action := Action, result := Result, + peername := PeerName, topic := Topic, authz_source := AuthzSource, username := Username @@ -4210,6 +4223,7 @@ verify_event_fields('client.check_authz_complete', Fields) -> ?assertEqual(<<"t1">>, Topic), ?assert(lists:member(Action, [subscribe, publish])), ?assert(lists:member(Result, [allow, deny])), + verify_peername(PeerName), ?assert( lists:member(AuthzSource, [ cache, @@ -4228,10 +4242,12 @@ verify_event_fields('client.check_authz_complete', Fields) -> verify_event_fields('client.check_authn_complete', Fields) -> #{ clientid := ClientId, + peername := PeerName, username := Username, is_anonymous := IsAnonymous, is_superuser := IsSuperuser } = Fields, + verify_peername(PeerName), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), ?assert(erlang:is_boolean(IsAnonymous)), diff --git a/changes/ce/feat-13506.en.md b/changes/ce/feat-13506.en.md new file mode 100644 index 000000000..0ebc701ca --- /dev/null +++ b/changes/ce/feat-13506.en.md @@ -0,0 +1 @@ +Added the `peername` field to all rule engine events that already contained the `peerhost` field. `peername` is a string and has the `IP:PORT` format.