feat(exhook): subopts in on_client_subscribe/on_client_unsubscribe
This commit is contained in:
parent
46201a8796
commit
cfe3b2dcee
|
@ -148,6 +148,9 @@ message ClientAuthorizeRequest {
|
||||||
|
|
||||||
AuthorizeReqType type = 2;
|
AuthorizeReqType type = 2;
|
||||||
|
|
||||||
|
// In ClientAuthorizeRequest.
|
||||||
|
// Only "real-topic" will be serialized in gRPC request when shared-sub.
|
||||||
|
// For example, when client subscribes to `$share/group/t/1`, the real topic is `t/1`.
|
||||||
string topic = 3;
|
string topic = 3;
|
||||||
|
|
||||||
bool result = 4;
|
bool result = 4;
|
||||||
|
@ -456,7 +459,14 @@ message TopicFilter {
|
||||||
|
|
||||||
string name = 1;
|
string name = 1;
|
||||||
|
|
||||||
uint32 qos = 2;
|
// Deprecated
|
||||||
|
// Since EMQX 5.4.0, we have deprecated the 'qos' field in the `TopicFilter` structure.
|
||||||
|
// A new field named 'subopts,' has been added to encompass all subscription options.
|
||||||
|
// Please see the `SubOpts` structure for details.
|
||||||
|
reserved 2;
|
||||||
|
reserved "qos";
|
||||||
|
|
||||||
|
SubOpts subopts = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SubOpts {
|
message SubOpts {
|
||||||
|
@ -464,11 +474,20 @@ message SubOpts {
|
||||||
// The QoS level
|
// The QoS level
|
||||||
uint32 qos = 1;
|
uint32 qos = 1;
|
||||||
|
|
||||||
// deprecated
|
// Deprecated
|
||||||
reserved 2;
|
reserved 2;
|
||||||
reserved "share";
|
reserved "share";
|
||||||
// The group name for shared subscription
|
// Since EMQX 5.4.0, we have deprecated the 'share' field in the `SubOpts` structure.
|
||||||
// string share = 2;
|
// The group name of shared subscription will be serialized with topic.
|
||||||
|
// hooks:
|
||||||
|
// "client.subscribe":
|
||||||
|
// ClientSubscribeRequest.TopicFilter.name = "$share/group/topic/1"
|
||||||
|
// "client.unsubscribe":
|
||||||
|
// ClientUnsubscribeRequest.TopicFilter.name = "$share/group/topic/1"
|
||||||
|
// "session.subscribed":
|
||||||
|
// SessionSubscribedRequest.topic = "$share/group/topic/1"
|
||||||
|
// "session.unsubscribed":
|
||||||
|
// SessionUnsubscribedRequest.topic = "$share/group/topic/1"
|
||||||
|
|
||||||
// The Retain Handling option (MQTT v5.0)
|
// The Retain Handling option (MQTT v5.0)
|
||||||
//
|
//
|
||||||
|
|
|
@ -192,7 +192,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) ->
|
||||||
Req = #{
|
Req = #{
|
||||||
clientinfo => clientinfo(ClientInfo),
|
clientinfo => clientinfo(ClientInfo),
|
||||||
topic => emqx_topic:maybe_format_share(Topic),
|
topic => emqx_topic:maybe_format_share(Topic),
|
||||||
subopts => maps:with([qos, rh, rap, nl], SubOpts)
|
subopts => subopts(SubOpts)
|
||||||
},
|
},
|
||||||
cast('session.subscribed', Req).
|
cast('session.subscribed', Req).
|
||||||
|
|
||||||
|
@ -200,6 +200,7 @@ on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
|
||||||
Req = #{
|
Req = #{
|
||||||
clientinfo => clientinfo(ClientInfo),
|
clientinfo => clientinfo(ClientInfo),
|
||||||
topic => emqx_topic:maybe_format_share(Topic)
|
topic => emqx_topic:maybe_format_share(Topic)
|
||||||
|
%% no subopts when unsub
|
||||||
},
|
},
|
||||||
cast('session.unsubscribed', Req).
|
cast('session.unsubscribed', Req).
|
||||||
|
|
||||||
|
@ -416,14 +417,19 @@ enrich_header(Headers, Message) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
topicfilters(Tfs) when is_list(Tfs) ->
|
topicfilters(Tfs) when is_list(Tfs) ->
|
||||||
GetQos = fun(SubOpts) ->
|
|
||||||
maps:get(qos, SubOpts, 0)
|
|
||||||
end,
|
|
||||||
[
|
[
|
||||||
#{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)}
|
#{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
|
||||||
|| {Topic, SubOpts} <- Tfs
|
|| {Topic, SubOpts} <- Tfs
|
||||||
].
|
].
|
||||||
|
|
||||||
|
subopts(SubOpts) ->
|
||||||
|
#{
|
||||||
|
qos => maps:get(qos, SubOpts, 0),
|
||||||
|
rh => maps:get(rh, SubOpts, 0),
|
||||||
|
rap => maps:get(rap, SubOpts, 0),
|
||||||
|
nl => maps:get(nl, SubOpts, 0)
|
||||||
|
}.
|
||||||
|
|
||||||
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
|
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
|
||||||
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
|
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
|
||||||
ntoa(IP) ->
|
ntoa(IP) ->
|
||||||
|
|
|
@ -530,7 +530,10 @@ properties(M) when is_map(M) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
topicfilters(Tfs) when is_list(Tfs) ->
|
topicfilters(Tfs) when is_list(Tfs) ->
|
||||||
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
|
[
|
||||||
|
#{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
|
||||||
|
|| {Topic, SubOpts} <- Tfs
|
||||||
|
].
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
stringfy(Term) when is_binary(Term) ->
|
stringfy(Term) when is_binary(Term) ->
|
||||||
|
|
Loading…
Reference in New Issue