diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4662eaee5..92715cf80 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -213,7 +213,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> -spec init(emqx_types:conninfo(), opts()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, _Port}, + peername := {PeerHost, PeerPort}, sockname := {_Host, SockPort} }, #{ @@ -237,6 +237,7 @@ init( listener => ListenerId, protocol => Protocol, peerhost => PeerHost, + peerport => PeerPort, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index c6b4c0518..ca038ac85 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -72,6 +72,22 @@ t_chan_info(_) -> conn_state := connected, clientinfo := ClientInfo } = emqx_channel:info(channel()), + ?assertMatch( + #{ + zone := default, + listener := {tcp, default}, + protocol := mqtt, + peerhost := {127, 0, 0, 1}, + peerport := 3456, + sockport := 1883, + clientid := <<"clientid">>, + username := <<"username">>, + is_superuser := false, + is_bridge := false, + mountpoint := undefined + }, + ClientInfo + ), ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> @@ -1063,7 +1079,8 @@ clientinfo(InitProps) -> listener => {tcp, default}, protocol => mqtt, peerhost => {127, 0, 0, 1}, - sockport => 3456, + peerport => 3456, + sockport => 1883, clientid => <<"clientid">>, username => <<"username">>, is_superuser => false, diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 6c2ad56f9..243a39007 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -108,6 +108,7 @@ clientinfo() -> {zone, zone()}, {protocol, protocol()}, {peerhost, ip()}, + {peerport, port()}, {sockport, port()}, {clientid, clientid()}, {username, username()}, diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index e5d7b3606..27ce25661 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -148,6 +148,9 @@ message ClientAuthorizeRequest { AuthorizeReqType type = 2; + // In ClientAuthorizeRequest. + // Only "real-topic" will be serialized in gRPC request when shared-sub. + // For example, when client subscribes to `$share/group/t/1`, the real topic is `t/1`. string topic = 3; bool result = 4; @@ -368,6 +371,8 @@ message ConnInfo { string proto_ver = 7; uint32 keepalive = 8; + + uint32 peerport = 9; } message ClientInfo { @@ -397,6 +402,8 @@ message ClientInfo { // subject of client TLS cert string dn = 12; + + uint32 peerport = 13; } message Message { @@ -452,7 +459,14 @@ message TopicFilter { string name = 1; - uint32 qos = 2; + // Deprecated + // Since EMQX 5.4.0, we have deprecated the 'qos' field in the `TopicFilter` structure. + // A new field named 'subopts,' has been added to encompass all subscription options. + // Please see the `SubOpts` structure for details. + reserved 2; + reserved "qos"; + + SubOpts subopts = 3; } message SubOpts { @@ -460,11 +474,20 @@ message SubOpts { // The QoS level uint32 qos = 1; - // deprecated + // Deprecated reserved 2; reserved "share"; - // The group name for shared subscription - // string share = 2; + // Since EMQX 5.4.0, we have deprecated the 'share' field in the `SubOpts` structure. + // The group name of shared subscription will be serialized with topic. + // hooks: + // "client.subscribe": + // ClientSubscribeRequest.TopicFilter.name = "$share/group/topic/1" + // "client.unsubscribe": + // ClientUnsubscribeRequest.TopicFilter.name = "$share/group/topic/1" + // "session.subscribed": + // SessionSubscribedRequest.topic = "$share/group/topic/1" + // "session.unsubscribed": + // SessionUnsubscribedRequest.topic = "$share/group/topic/1" // The Retain Handling option (MQTT v5.0) // diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 2bcb91b12..5c1d18c17 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) -> Req = #{ clientinfo => clientinfo(ClientInfo), type => Type, - topic => emqx_topic:maybe_format_share(Topic), + topic => emqx_topic:get_shared_real_topic(Topic), result => Bool }, case @@ -192,7 +192,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic), - subopts => maps:with([qos, rh, rap, nl], SubOpts) + subopts => subopts(SubOpts) }, cast('session.subscribed', Req). @@ -200,6 +200,7 @@ on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic) + %% no subopts when unsub }, cast('session.unsubscribed', Req). @@ -294,7 +295,7 @@ conninfo( ConnInfo = #{ clientid := ClientId, - peername := {Peerhost, _}, + peername := {Peerhost, PeerPort}, sockname := {_, SockPort} } ) -> @@ -307,6 +308,7 @@ conninfo( clientid => ClientId, username => maybe(Username), peerhost => ntoa(Peerhost), + peerport => PeerPort, sockport => SockPort, proto_name => ProtoName, proto_ver => stringfy(ProtoVer), @@ -319,6 +321,7 @@ clientinfo( clientid := ClientId, username := Username, peerhost := PeerHost, + peerport := PeerPort, sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont @@ -330,6 +333,7 @@ clientinfo( username => maybe(Username), password => maybe(maps:get(password, ClientInfo, undefined)), peerhost => ntoa(PeerHost), + peerport => PeerPort, sockport => SockPort, protocol => stringfy(Protocol), mountpoint => maybe(Mountpoiont), @@ -413,14 +417,19 @@ enrich_header(Headers, Message) -> end. topicfilters(Tfs) when is_list(Tfs) -> - GetQos = fun(SubOpts) -> - maps:get(qos, SubOpts, 0) - end, [ - #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)} + #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)} || {Topic, SubOpts} <- Tfs ]. +subopts(SubOpts) -> + #{ + qos => maps:get(qos, SubOpts, 0), + rh => maps:get(rh, SubOpts, 0), + rap => maps:get(rap, SubOpts, 0), + nl => maps:get(nl, SubOpts, 0) + }. + ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) -> list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); ntoa(IP) -> diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 3da73c11a..ffe932449 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -118,6 +118,7 @@ t_access_failed_if_no_server_running(Config) -> clientid => <<"user-id-1">>, username => <<"usera">>, peerhost => {127, 0, 0, 1}, + peerport => 3456, sockport => 1883, protocol => mqtt, mountpoint => undefined @@ -301,6 +302,7 @@ t_simulated_handler(_) -> clientid => <<"user-id-1">>, username => <<"usera">>, peerhost => {127, 0, 0, 1}, + peerport => 3456, sockport => 1883, protocol => mqtt, mountpoint => undefined diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index cf48fff80..041091e27 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -496,6 +496,9 @@ nodestr() -> peerhost(#{peername := {Host, _}}) -> ntoa(Host). +peerport(#{peername := {_, Port}}) -> + Port. + sockport(#{sockname := {_, Port}}) -> Port. @@ -527,7 +530,10 @@ properties(M) when is_map(M) -> ). topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + [ + #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)} + || {Topic, SubOpts} <- Tfs + ]. %% @private stringfy(Term) when is_binary(Term) -> @@ -564,6 +570,7 @@ from_conninfo(ConnInfo) -> clientid => maps:get(clientid, ConnInfo), username => maybe(maps:get(username, ConnInfo, <<>>)), peerhost => peerhost(ConnInfo), + peerport => peerport(ConnInfo), sockport => sockport(ConnInfo), proto_name => maps:get(proto_name, ConnInfo), proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), @@ -577,6 +584,7 @@ from_clientinfo(ClientInfo) -> username => maybe(maps:get(username, ClientInfo, <<>>)), password => maybe(maps:get(password, ClientInfo, <<>>)), peerhost => ntoa(maps:get(peerhost, ClientInfo)), + peerport => maps:get(peerport, ClientInfo), sockport => maps:get(sockport, ClientInfo), protocol => stringfy(maps:get(protocol, ClientInfo)), mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), diff --git a/changes/feat-12114.en.md b/changes/feat-12114.en.md new file mode 100644 index 000000000..7e1b5536b --- /dev/null +++ b/changes/feat-12114.en.md @@ -0,0 +1,6 @@ +Added the `peerport` field to ClientInfo. +Added the `peerport` field to the messages `ClientInfo` and `ConnInfo` in ExHook. + +## Breaking changes +* ExHook Proto changed. The `qos` field in message `TopicFilter` was deprecated. + ExHook Server will now receive full subscription options: `qos`, `rh`, `rap`, `nl` in message `SubOpts`