From 68da627b4dc80fe1e67b12a80cfe959e92c133e1 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 6 Dec 2023 15:28:28 +0800 Subject: [PATCH 1/5] feat(channel): add peerport field in ClientInfo --- apps/emqx/src/emqx_channel.erl | 3 ++- apps/emqx/test/emqx_channel_SUITE.erl | 19 ++++++++++++++++++- apps/emqx/test/emqx_proper_types.erl | 1 + 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4662eaee5..92715cf80 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -213,7 +213,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> -spec init(emqx_types:conninfo(), opts()) -> channel(). init( ConnInfo = #{ - peername := {PeerHost, _Port}, + peername := {PeerHost, PeerPort}, sockname := {_Host, SockPort} }, #{ @@ -237,6 +237,7 @@ init( listener => ListenerId, protocol => Protocol, peerhost => PeerHost, + peerport => PeerPort, sockport => SockPort, clientid => undefined, username => undefined, diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index c6b4c0518..ca038ac85 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -72,6 +72,22 @@ t_chan_info(_) -> conn_state := connected, clientinfo := ClientInfo } = emqx_channel:info(channel()), + ?assertMatch( + #{ + zone := default, + listener := {tcp, default}, + protocol := mqtt, + peerhost := {127, 0, 0, 1}, + peerport := 3456, + sockport := 1883, + clientid := <<"clientid">>, + username := <<"username">>, + is_superuser := false, + is_bridge := false, + mountpoint := undefined + }, + ClientInfo + ), ?assertEqual(clientinfo(), ClientInfo). t_chan_caps(_) -> @@ -1063,7 +1079,8 @@ clientinfo(InitProps) -> listener => {tcp, default}, protocol => mqtt, peerhost => {127, 0, 0, 1}, - sockport => 3456, + peerport => 3456, + sockport => 1883, clientid => <<"clientid">>, username => <<"username">>, is_superuser => false, diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 6c2ad56f9..243a39007 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -108,6 +108,7 @@ clientinfo() -> {zone, zone()}, {protocol, protocol()}, {peerhost, ip()}, + {peerport, port()}, {sockport, port()}, {clientid, clientid()}, {username, username()}, From 46201a8796d4ce8e333c8f7d4e360157f7f430e3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 6 Dec 2023 10:54:55 +0800 Subject: [PATCH 2/5] feat(exhook): provide the `peerport` field - both in `ConnInfo` and `ClientInfo` --- apps/emqx_exhook/priv/protos/exhook.proto | 4 ++++ apps/emqx_exhook/src/emqx_exhook_handler.erl | 5 ++++- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 2 ++ apps/emqx_exhook/test/props/prop_exhook_hooks.erl | 5 +++++ 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index e5d7b3606..6f6b860af 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -368,6 +368,8 @@ message ConnInfo { string proto_ver = 7; uint32 keepalive = 8; + + uint32 peerport = 9; } message ClientInfo { @@ -397,6 +399,8 @@ message ClientInfo { // subject of client TLS cert string dn = 12; + + uint32 peerport = 13; } message Message { diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 2bcb91b12..f3dfa111c 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -294,7 +294,7 @@ conninfo( ConnInfo = #{ clientid := ClientId, - peername := {Peerhost, _}, + peername := {Peerhost, PeerPort}, sockname := {_, SockPort} } ) -> @@ -307,6 +307,7 @@ conninfo( clientid => ClientId, username => maybe(Username), peerhost => ntoa(Peerhost), + peerport => PeerPort, sockport => SockPort, proto_name => ProtoName, proto_ver => stringfy(ProtoVer), @@ -319,6 +320,7 @@ clientinfo( clientid := ClientId, username := Username, peerhost := PeerHost, + peerport := PeerPort, sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont @@ -330,6 +332,7 @@ clientinfo( username => maybe(Username), password => maybe(maps:get(password, ClientInfo, undefined)), peerhost => ntoa(PeerHost), + peerport => PeerPort, sockport => SockPort, protocol => stringfy(Protocol), mountpoint => maybe(Mountpoiont), diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 3da73c11a..ffe932449 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -118,6 +118,7 @@ t_access_failed_if_no_server_running(Config) -> clientid => <<"user-id-1">>, username => <<"usera">>, peerhost => {127, 0, 0, 1}, + peerport => 3456, sockport => 1883, protocol => mqtt, mountpoint => undefined @@ -301,6 +302,7 @@ t_simulated_handler(_) -> clientid => <<"user-id-1">>, username => <<"usera">>, peerhost => {127, 0, 0, 1}, + peerport => 3456, sockport => 1883, protocol => mqtt, mountpoint => undefined diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index cf48fff80..827205d1d 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -496,6 +496,9 @@ nodestr() -> peerhost(#{peername := {Host, _}}) -> ntoa(Host). +peerport(#{peername := {_, Port}}) -> + Port. + sockport(#{sockname := {_, Port}}) -> Port. @@ -564,6 +567,7 @@ from_conninfo(ConnInfo) -> clientid => maps:get(clientid, ConnInfo), username => maybe(maps:get(username, ConnInfo, <<>>)), peerhost => peerhost(ConnInfo), + peerport => peerport(ConnInfo), sockport => sockport(ConnInfo), proto_name => maps:get(proto_name, ConnInfo), proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), @@ -577,6 +581,7 @@ from_clientinfo(ClientInfo) -> username => maybe(maps:get(username, ClientInfo, <<>>)), password => maybe(maps:get(password, ClientInfo, <<>>)), peerhost => ntoa(maps:get(peerhost, ClientInfo)), + peerport => maps:get(peerport, ClientInfo), sockport => maps:get(sockport, ClientInfo), protocol => stringfy(maps:get(protocol, ClientInfo)), mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), From cfe3b2dcee6a878c2bdb7f5d7d0eb200aa5f05e3 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 6 Dec 2023 18:02:06 +0800 Subject: [PATCH 3/5] feat(exhook): subopts in on_client_subscribe/on_client_unsubscribe --- apps/emqx_exhook/priv/protos/exhook.proto | 27 ++++++++++++++++--- apps/emqx_exhook/src/emqx_exhook_handler.erl | 16 +++++++---- .../test/props/prop_exhook_hooks.erl | 5 +++- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 6f6b860af..27ce25661 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -148,6 +148,9 @@ message ClientAuthorizeRequest { AuthorizeReqType type = 2; + // In ClientAuthorizeRequest. + // Only "real-topic" will be serialized in gRPC request when shared-sub. + // For example, when client subscribes to `$share/group/t/1`, the real topic is `t/1`. string topic = 3; bool result = 4; @@ -456,7 +459,14 @@ message TopicFilter { string name = 1; - uint32 qos = 2; + // Deprecated + // Since EMQX 5.4.0, we have deprecated the 'qos' field in the `TopicFilter` structure. + // A new field named 'subopts,' has been added to encompass all subscription options. + // Please see the `SubOpts` structure for details. + reserved 2; + reserved "qos"; + + SubOpts subopts = 3; } message SubOpts { @@ -464,11 +474,20 @@ message SubOpts { // The QoS level uint32 qos = 1; - // deprecated + // Deprecated reserved 2; reserved "share"; - // The group name for shared subscription - // string share = 2; + // Since EMQX 5.4.0, we have deprecated the 'share' field in the `SubOpts` structure. + // The group name of shared subscription will be serialized with topic. + // hooks: + // "client.subscribe": + // ClientSubscribeRequest.TopicFilter.name = "$share/group/topic/1" + // "client.unsubscribe": + // ClientUnsubscribeRequest.TopicFilter.name = "$share/group/topic/1" + // "session.subscribed": + // SessionSubscribedRequest.topic = "$share/group/topic/1" + // "session.unsubscribed": + // SessionUnsubscribedRequest.topic = "$share/group/topic/1" // The Retain Handling option (MQTT v5.0) // diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index f3dfa111c..fe6af653b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -192,7 +192,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic), - subopts => maps:with([qos, rh, rap, nl], SubOpts) + subopts => subopts(SubOpts) }, cast('session.subscribed', Req). @@ -200,6 +200,7 @@ on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic) + %% no subopts when unsub }, cast('session.unsubscribed', Req). @@ -416,14 +417,19 @@ enrich_header(Headers, Message) -> end. topicfilters(Tfs) when is_list(Tfs) -> - GetQos = fun(SubOpts) -> - maps:get(qos, SubOpts, 0) - end, [ - #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)} + #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)} || {Topic, SubOpts} <- Tfs ]. +subopts(SubOpts) -> + #{ + qos => maps:get(qos, SubOpts, 0), + rh => maps:get(rh, SubOpts, 0), + rap => maps:get(rap, SubOpts, 0), + nl => maps:get(nl, SubOpts, 0) + }. + ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) -> list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); ntoa(IP) -> diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 827205d1d..041091e27 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -530,7 +530,10 @@ properties(M) when is_map(M) -> ). topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + [ + #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)} + || {Topic, SubOpts} <- Tfs + ]. %% @private stringfy(Term) when is_binary(Term) -> From 47901c9fed09775e564de4057e4f8cac7d016551 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 10 Dec 2023 22:21:42 +0800 Subject: [PATCH 4/5] fix(exhook): `client.authorize` hook always uses real-topic See: emqx_channel:do_check_sub_authzs/3, line: 1895 --- apps/emqx_exhook/src/emqx_exhook_handler.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index fe6af653b..5c1d18c17 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) -> Req = #{ clientinfo => clientinfo(ClientInfo), type => Type, - topic => emqx_topic:maybe_format_share(Topic), + topic => emqx_topic:get_shared_real_topic(Topic), result => Bool }, case From fa4d73835be1ce9fec7d7d9d9238cebaa0bda8d4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 10 Dec 2023 23:15:55 +0800 Subject: [PATCH 5/5] chore: bump changes --- changes/feat-12114.en.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changes/feat-12114.en.md diff --git a/changes/feat-12114.en.md b/changes/feat-12114.en.md new file mode 100644 index 000000000..7e1b5536b --- /dev/null +++ b/changes/feat-12114.en.md @@ -0,0 +1,6 @@ +Added the `peerport` field to ClientInfo. +Added the `peerport` field to the messages `ClientInfo` and `ConnInfo` in ExHook. + +## Breaking changes +* ExHook Proto changed. The `qos` field in message `TopicFilter` was deprecated. + ExHook Server will now receive full subscription options: `qos`, `rh`, `rap`, `nl` in message `SubOpts`