fix(exhook): ignore hook exection for `no_matched` topic

For message hooks, if the topics do not match, the hook should continue to be executed.
This commit is contained in:
JimMoen 2022-08-25 18:18:14 +08:00
parent 074e25f433
commit d0f88b1af9
3 changed files with 64 additions and 38 deletions

View File

@ -71,44 +71,59 @@ cast(Hookpoint, Req) ->
cast(_, _, []) -> cast(_, _, []) ->
ok; ok;
cast(Hookpoint, Req, [ServerName|More]) -> cast(Hookpoint, Req, [ServerName|More]) ->
%% XXX: Need a real asynchronous running %% TODO: Need a real asynchronous running
_ = emqx_exhook_server:call(Hookpoint, Req, _ = emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mngr:server(ServerName)), emqx_exhook_mngr:server(ServerName)),
cast(Hookpoint, Req, More). cast(Hookpoint, Req, More).
-spec call_fold(atom(), term(), function()) -spec call_fold(atom(), term(), function())
-> {ok, term()} -> {ok, term()}
| {stop, term()}. | {stop, term()}
| ignore.
call_fold(Hookpoint, Req, AccFun) -> call_fold(Hookpoint, Req, AccFun) ->
FailedAction = emqx_exhook_mngr:get_request_failed_action(), FailedAction = emqx_exhook_mngr:get_request_failed_action(),
ServerNames = emqx_exhook_mngr:running(), ServerNames = emqx_exhook_mngr:running(),
case ServerNames == [] andalso FailedAction == deny of case ServerNames == [] andalso FailedAction == deny of
true -> true ->
?LOG(warning, "No available Server for hook: ~p . Stop hook chain execution with `request_failed_action=deny`.", [Hookpoint]),
{stop, deny_action_result(Hookpoint, Req)}; {stop, deny_action_result(Hookpoint, Req)};
_ -> _ ->
%% `Req` (includede message.. etc.) as `InitAcc` for `emqx_hook`
call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames) call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames)
end. end.
call_fold(_, Req, _, _, []) -> -define(LOG_CALL_RESULT(ServerName, Res, Fmt),
{ok, Req}; ?LOG(debug, "ExHook server: ~p respond type: ~p. " ++ Fmt, [ServerName, Resp])).
call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
call_fold(_, Acc, _, _, []) ->
{ok, Acc};
call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, [ServerName | More]) ->
Server = emqx_exhook_mngr:server(ServerName), Server = emqx_exhook_mngr:server(ServerName),
case emqx_exhook_server:call(Hookpoint, Req, Server) of case emqx_exhook_server:call(Hookpoint, Acc, Server) of
ignore ->
%% Server is not mounted / or does not care about this hook
%% See emqx_exhook_server:need_call/3
ignore;
{ok, Resp} -> {ok, Resp} ->
case AccFun(Req, Resp) of case MergeAccFun(Acc, Resp) of
{stop, NReq} -> {stop, NewAcc} ->
{stop, NReq}; ?LOG_CALL_RESULT(ServerName, "'STOP_AND_RETURN'", "Stop hook chain execution"),
{ok, NReq} -> {stop, NewAcc};
call_fold(Hookpoint, NReq, FailedAction, AccFun, More); {ok, NewAccAsNReq} ->
_ -> ?LOG_CALL_RESULT(ServerName, "'CONTINUE'", "Continue calling remaining ExHook servers."),
call_fold(Hookpoint, Req, FailedAction, AccFun, More) call_fold(Hookpoint, NewAccAsNReq, FailedAction, MergeAccFun, More);
ignore ->
?LOG_CALL_RESULT(ServerName, "'IGNORE'", "Continue calling remaining ExHook servers."),
call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More)
end; end;
_ -> {error, _Reason} ->
case FailedAction of case FailedAction of
deny -> deny ->
{stop, deny_action_result(Hookpoint, Req)}; ?LOG(error, "Call server: ~p for hook: ~p failed. Stop hook chain execution with `request_failed_action=deny`.",
[ServerName, Hookpoint]),
{stop, deny_action_result(Hookpoint, Acc)};
_ -> _ ->
call_fold(Hookpoint, Req, FailedAction, AccFun, More) call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More)
end end
end. end.

View File

@ -62,6 +62,9 @@
, call_fold/3 , call_fold/3
]). ]).
-define(STOP_OR_OK(Res),
(Res =:= ok orelse Res =:= stop)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Clients %% Clients
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -102,10 +105,11 @@ on_client_authenticate(ClientInfo, AuthResult) ->
case call_fold('client.authenticate', Req, case call_fold('client.authenticate', Req,
fun merge_responsed_bool/2) of fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) -> {StopOrOk, #{result := Result0}}
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
Result = case Result0 of true -> success; _ -> not_authorized end, Result = case Result0 of true -> success; _ -> not_authorized end,
{StopOrOk, AuthResult#{auth_result => Result, anonymous => false}}; {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
_ -> ignore ->
{ok, AuthResult} {ok, AuthResult}
end. end.
@ -122,10 +126,11 @@ on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
}, },
case call_fold('client.check_acl', Req, case call_fold('client.check_acl', Req,
fun merge_responsed_bool/2) of fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) -> {StopOrOk, #{result := Result0}}
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
NResult = case Result0 of true -> allow; _ -> deny end, NResult = case Result0 of true -> allow; _ -> deny end,
{StopOrOk, NResult}; {StopOrOk, NResult};
_ -> {ok, Result} ignore -> {ok, Result}
end. end.
on_client_subscribe(ClientInfo, Props, TopicFilters) -> on_client_subscribe(ClientInfo, Props, TopicFilters) ->
@ -190,9 +195,11 @@ on_message_publish(Message) ->
Req = #{message => message(Message)}, Req = #{message => message(Message)},
case call_fold('message.publish', Req, case call_fold('message.publish', Req,
fun emqx_exhook_handler:merge_responsed_message/2) of fun emqx_exhook_handler:merge_responsed_message/2) of
{StopOrOk, #{message := NMessage}} -> {StopOrOk, #{message := NMessage}}
when ?STOP_OR_OK(StopOrOk) ->
{StopOrOk, assign_to_message(NMessage, Message)}; {StopOrOk, assign_to_message(NMessage, Message)};
_ -> {ok, Message} ignore ->
{ok, Message}
end. end.
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->

View File

@ -240,22 +240,12 @@ name(#server{name = Name}) ->
| {error, term()}. | {error, term()}.
call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
hookspec = Hooks, prefix = Prefix}) -> hookspec = Hooks, prefix = Prefix}) ->
GrpcFunc = hk2func(Hookpoint), case need_call(Hookpoint, Req, Hooks) of
case maps:get(Hookpoint, Hooks, undefined) of true ->
undefined -> ignore;
Opts ->
NeedCall = case lists:member(Hookpoint, message_hooks()) of
false -> true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end,
case NeedCall of
false -> ignore;
_ ->
inc_metrics(Prefix, Hookpoint), inc_metrics(Prefix, Hookpoint),
do_call(ChannName, GrpcFunc, Req, ReqOpts) do_call(ChannName, hk2func(Hookpoint), Req, ReqOpts);
end false ->
ignore
end. end.
%% @private %% @private
@ -266,6 +256,20 @@ inc_metrics(IncFun, Name) when is_function(IncFun) ->
inc_metrics(Prefix, Name) when is_list(Prefix) -> inc_metrics(Prefix, Name) when is_list(Prefix) ->
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
need_call(Hookpoint, Req, Hooks) ->
case maps:get(Hookpoint, Hooks, undefined) of
undefined ->
false; %% Hookpoint is not mounted on this server
Opts ->
case lists:member(Hookpoint, message_hooks()) of
false ->
true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end
end.
-compile({inline, [match_topic_filter/2]}). -compile({inline, [match_topic_filter/2]}).
match_topic_filter(_, []) -> match_topic_filter(_, []) ->
true; true;