Merge pull request #8288 from thalesmg/fix-client-sub-hook

fix(hooks): use hook response to stop subscriptions
This commit is contained in:
Xinyu Liu 2022-06-23 11:42:42 +08:00 committed by GitHub
commit 14a37bbc89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 80 additions and 20 deletions

View File

@ -27,6 +27,8 @@ File format:
3.1. [#8177]
- Do not match ACL rules containing placeholders if there's no
information to fill them. [#8280]
- Fixed issue in Lua hook that didn't prevent a topic from being
subscribed to. [#8288]
## v4.3.15

View File

@ -1,6 +1,6 @@
{application, emqx_lua_hook,
[{description, "EMQ X Lua Hooks"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib]},

View File

@ -0,0 +1,5 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}],
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}.

View File

@ -169,8 +169,11 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties,
NewTopicFilters =
lists:foldr(fun(TopicFilter, Acc) ->
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
false -> Acc;
NewTopicFilter -> [NewTopicFilter | Acc]
false ->
{Topic, Opts} = TopicFilter,
[{Topic, Opts#{delete => true}} | Acc];
NewTopicFilter ->
[NewTopicFilter | Acc]
end
end, [], TopicFilters),
case NewTopicFilters of

View File

@ -37,7 +37,8 @@ all() ->
case101,
case110, case111, case112, case113, case114, case115,
case201, case202, case203, case204, case205,
case301, case302
case301, case302,
t_stop_sub
].
init_per_suite(Config) ->
@ -214,8 +215,8 @@ case31(_Config) ->
"\n return \"on_client_connected\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.connected',
?assertEqual(ok,
emqx_hooks:run('client.connected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
case32(_Config) ->
@ -228,8 +229,8 @@ case32(_Config) ->
"\n return \"on_client_connected\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.connected',
?assertEqual(ok,
emqx_hooks:run('client.connected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
case41(_Config) ->
@ -336,8 +337,8 @@ case61(_Config) ->
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
case62(_Config) ->
@ -351,8 +352,8 @@ case62(_Config) ->
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
case71(_Config) ->
@ -691,3 +692,26 @@ case302(_Config) ->
},
?assertEqual(allow, emqx_hooks:run_fold('client.check_acl',
[ClientInfo, publish, <<"mytopic">>], deny)).
t_stop_sub(_Config) ->
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
Code = "function on_client_subscribe(clientid, username, topic)"
"\n return false"
"\nend"
"\n"
"\nfunction register_hook()"
"\n return \"on_client_subscribe\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
ClientInfo = #{clientid => undefined,
username => <<"test">>,
peerhost => {127, 0, 0, 1},
password => <<"mqtt">>
},
OriginalTopicFilters = [{Topic = <<"u">>,
Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}],
Props = #{},
Expected = [{Topic, Opts#{delete => true}}],
?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe',
[ClientInfo, Props],
OriginalTopicFilters)).

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.16",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
@ -562,7 +563,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.16",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},

View File

@ -416,11 +416,6 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
end, TupleTopicFilters0) of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false ->
Replace = fun
_Fun(TupleList, [ Tuple = {Key, _Value} | More]) ->
_Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
_Fun(TupleList, []) -> TupleList
end,
TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
TopicFilters3 = run_hooks('client.subscribe',
[ClientInfo, Properties],
@ -428,7 +423,16 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
{TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3,
Properties,
Channel),
TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1),
TupleTopicFilters2 =
lists:foldl(
fun({{Topic, Opts = #{delete := true}}, _QoS}, Acc) ->
Key = {Topic, maps:without([delete], Opts)},
lists:keydelete(Key, 1, Acc);
(Tuple = {Key, _Value}, Acc) ->
lists:keyreplace(Key, 1, Acc, Tuple)
end,
TupleTopicFilters0,
TupleTopicFilters1),
ReasonCodes2 = [ ReasonCode
|| {_TopicFilter, ReasonCode} <- TupleTopicFilters2],
handle_out(suback, {PacketId, ReasonCodes2}, NChannel)

View File

@ -309,6 +309,26 @@ t_connack_auth_error(Config) when is_list(Config) ->
?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
ok.
t_handle_in_empty_client_subscribe_hook({init, Config}) ->
Config;
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
ok;
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter],
{stop, EmptyFilters}
end,
ok = emqx:hook('client.subscribe', Hook, []),
try
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
?assertEqual([], RCs),
ok
after
ok = emqx:unhook('client.subscribe', Hook)
end.
recv_msgs(Count) ->
recv_msgs(Count, []).