fix(hooks): use hook response to stop subscriptions

Fixes https://github.com/emqx/emqx-lua-hook/issues/118

1) If the Lua hook returns an empty list, it should replace the final
topic filter list.
2) The subscribed topics after running the 'client.subscribe' hook
should not contain topics filtered out by the hooks.
This commit is contained in:
Thales Macedo Garitezi 2022-06-21 13:58:53 -03:00
parent 117e040174
commit 4bda62b195
8 changed files with 80 additions and 20 deletions

View File

@ -25,6 +25,8 @@ File format:
3.1. [#8177]
- Do not match ACL rules containing placeholders if there's no
information to fill them. [#8280]
- Fixed issue in Lua hook that didn't prevent a topic from being
subscribed to. [#8288]
## v4.3.15

View File

@ -1,6 +1,6 @@
{application, emqx_lua_hook,
[{description, "EMQ X Lua Hooks"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib]},

View File

@ -0,0 +1,5 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}],
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}.

View File

@ -169,8 +169,11 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties,
NewTopicFilters =
lists:foldr(fun(TopicFilter, Acc) ->
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
false -> Acc;
NewTopicFilter -> [NewTopicFilter | Acc]
false ->
{Topic, Opts} = TopicFilter,
[{Topic, Opts#{delete => true}} | Acc];
NewTopicFilter ->
[NewTopicFilter | Acc]
end
end, [], TopicFilters),
case NewTopicFilters of

View File

@ -37,7 +37,8 @@ all() ->
case101,
case110, case111, case112, case113, case114, case115,
case201, case202, case203, case204, case205,
case301, case302
case301, case302,
t_stop_sub
].
init_per_suite(Config) ->
@ -214,8 +215,8 @@ case31(_Config) ->
"\n return \"on_client_connected\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.connected',
?assertEqual(ok,
emqx_hooks:run('client.connected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
case32(_Config) ->
@ -228,8 +229,8 @@ case32(_Config) ->
"\n return \"on_client_connected\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.connected',
?assertEqual(ok,
emqx_hooks:run('client.connected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
case41(_Config) ->
@ -336,8 +337,8 @@ case61(_Config) ->
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
case62(_Config) ->
@ -351,8 +352,8 @@ case62(_Config) ->
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
?assertEqual(ok,
emqx_hooks:run('client.disconnected',
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
case71(_Config) ->
@ -691,3 +692,26 @@ case302(_Config) ->
},
?assertEqual(allow, emqx_hooks:run_fold('client.check_acl',
[ClientInfo, publish, <<"mytopic">>], deny)).
t_stop_sub(_Config) ->
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
Code = "function on_client_subscribe(clientid, username, topic)"
"\n return false"
"\nend"
"\n"
"\nfunction register_hook()"
"\n return \"on_client_subscribe\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
ClientInfo = #{clientid => undefined,
username => <<"test">>,
peerhost => {127, 0, 0, 1},
password => <<"mqtt">>
},
OriginalTopicFilters = [{Topic = <<"u">>,
Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}],
Props = #{},
Expected = [{Topic, Opts#{delete => true}}],
?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe',
[ClientInfo, Props],
OriginalTopicFilters)).

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.16",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
@ -562,7 +563,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.16",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},

View File

@ -416,11 +416,6 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
end, TupleTopicFilters0) of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false ->
Replace = fun
_Fun(TupleList, [ Tuple = {Key, _Value} | More]) ->
_Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
_Fun(TupleList, []) -> TupleList
end,
TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
TopicFilters3 = run_hooks('client.subscribe',
[ClientInfo, Properties],
@ -428,7 +423,16 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
{TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3,
Properties,
Channel),
TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1),
TupleTopicFilters2 =
lists:foldl(
fun({{Topic, Opts = #{delete := true}}, _QoS}, Acc) ->
Key = {Topic, maps:without([delete], Opts)},
lists:keydelete(Key, 1, Acc);
(Tuple = {Key, _Value}, Acc) ->
lists:keyreplace(Key, 1, Acc, Tuple)
end,
TupleTopicFilters0,
TupleTopicFilters1),
ReasonCodes2 = [ ReasonCode
|| {_TopicFilter, ReasonCode} <- TupleTopicFilters2],
handle_out(suback, {PacketId, ReasonCodes2}, NChannel)

View File

@ -309,6 +309,26 @@ t_connack_auth_error(Config) when is_list(Config) ->
?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
ok.
t_handle_in_empty_client_subscribe_hook({init, Config}) ->
Config;
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
ok;
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter],
{stop, EmptyFilters}
end,
ok = emqx:hook('client.subscribe', Hook, []),
try
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
?assertEqual([], RCs),
ok
after
ok = emqx:unhook('client.subscribe', Hook)
end.
recv_msgs(Count) ->
recv_msgs(Count, []).