fix(hooks): return error on hook-removed subscription rather than deleting it
Following https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901178
This commit is contained in:
parent
905ae0dcad
commit
5c89998049
|
@ -171,7 +171,7 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties,
|
||||||
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
|
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
|
||||||
false ->
|
false ->
|
||||||
{Topic, Opts} = TopicFilter,
|
{Topic, Opts} = TopicFilter,
|
||||||
[{Topic, Opts#{delete => true}} | Acc];
|
[{Topic, Opts#{deny_subscription => true}} | Acc];
|
||||||
NewTopicFilter ->
|
NewTopicFilter ->
|
||||||
[NewTopicFilter | Acc]
|
[NewTopicFilter | Acc]
|
||||||
end
|
end
|
||||||
|
|
|
@ -711,7 +711,7 @@ t_stop_sub(_Config) ->
|
||||||
OriginalTopicFilters = [{Topic = <<"u">>,
|
OriginalTopicFilters = [{Topic = <<"u">>,
|
||||||
Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}],
|
Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}],
|
||||||
Props = #{},
|
Props = #{},
|
||||||
Expected = [{Topic, Opts#{delete => true}}],
|
Expected = [{Topic, Opts#{deny_subscription => true}}],
|
||||||
?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe',
|
?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe',
|
||||||
[ClientInfo, Props],
|
[ClientInfo, Props],
|
||||||
OriginalTopicFilters)).
|
OriginalTopicFilters)).
|
||||||
|
|
|
@ -425,9 +425,9 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
Channel),
|
Channel),
|
||||||
TupleTopicFilters2 =
|
TupleTopicFilters2 =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({{Topic, Opts = #{delete := true}}, _QoS}, Acc) ->
|
fun({{Topic, Opts = #{deny_subscription := true}}, _ReturnCode}, Acc) ->
|
||||||
Key = {Topic, maps:without([delete], Opts)},
|
Key = {Topic, maps:without([deny_subscription], Opts)},
|
||||||
lists:keydelete(Key, 1, Acc);
|
lists:keyreplace(Key, 1, Acc, {Key, ?RC_UNSPECIFIED_ERROR});
|
||||||
(Tuple = {Key, _Value}, Acc) ->
|
(Tuple = {Key, _Value}, Acc) ->
|
||||||
lists:keyreplace(Key, 1, Acc, Tuple)
|
lists:keyreplace(Key, 1, Acc, Tuple)
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -315,7 +315,7 @@ t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
|
||||||
ok;
|
ok;
|
||||||
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
|
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
|
||||||
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
|
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
|
||||||
EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter],
|
EmptyFilters = [{T, Opts#{deny_subscription => true}} || {T, Opts} <- TopicFilter],
|
||||||
{stop, EmptyFilters}
|
{stop, EmptyFilters}
|
||||||
end,
|
end,
|
||||||
ok = emqx:hook('client.subscribe', Hook, []),
|
ok = emqx:hook('client.subscribe', Hook, []),
|
||||||
|
@ -323,7 +323,7 @@ t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
|
||||||
{ok, C} = emqtt:start_link(),
|
{ok, C} = emqtt:start_link(),
|
||||||
{ok, _} = emqtt:connect(C),
|
{ok, _} = emqtt:connect(C),
|
||||||
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
|
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
|
||||||
?assertEqual([], RCs),
|
?assertEqual([?RC_UNSPECIFIED_ERROR], RCs),
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
ok = emqx:unhook('client.subscribe', Hook)
|
ok = emqx:unhook('client.subscribe', Hook)
|
||||||
|
|
Loading…
Reference in New Issue