From d0f88b1af9aec7b5838278c7e94b443e042c397f Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Aug 2022 18:18:14 +0800 Subject: [PATCH] fix(exhook): ignore hook exection for `no_matched` topic For message hooks, if the topics do not match, the hook should continue to be executed. --- apps/emqx_exhook/src/emqx_exhook.erl | 47 +++++++++++++------- apps/emqx_exhook/src/emqx_exhook_handler.erl | 19 +++++--- apps/emqx_exhook/src/emqx_exhook_server.erl | 36 ++++++++------- 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index 551d0a126..ca8b37ec5 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -71,44 +71,59 @@ cast(Hookpoint, Req) -> cast(_, _, []) -> ok; cast(Hookpoint, Req, [ServerName|More]) -> - %% XXX: Need a real asynchronous running + %% TODO: Need a real asynchronous running _ = emqx_exhook_server:call(Hookpoint, Req, emqx_exhook_mngr:server(ServerName)), cast(Hookpoint, Req, More). -spec call_fold(atom(), term(), function()) -> {ok, term()} - | {stop, term()}. + | {stop, term()} + | ignore. call_fold(Hookpoint, Req, AccFun) -> FailedAction = emqx_exhook_mngr:get_request_failed_action(), ServerNames = emqx_exhook_mngr:running(), case ServerNames == [] andalso FailedAction == deny of true -> + ?LOG(warning, "No available Server for hook: ~p . Stop hook chain execution with `request_failed_action=deny`.", [Hookpoint]), {stop, deny_action_result(Hookpoint, Req)}; _ -> + %% `Req` (includede message.. etc.) as `InitAcc` for `emqx_hook` call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames) end. -call_fold(_, Req, _, _, []) -> - {ok, Req}; -call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) -> +-define(LOG_CALL_RESULT(ServerName, Res, Fmt), + ?LOG(debug, "ExHook server: ~p respond type: ~p. " ++ Fmt, [ServerName, Resp])). + +call_fold(_, Acc, _, _, []) -> + {ok, Acc}; +call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, [ServerName | More]) -> Server = emqx_exhook_mngr:server(ServerName), - case emqx_exhook_server:call(Hookpoint, Req, Server) of + case emqx_exhook_server:call(Hookpoint, Acc, Server) of + ignore -> + %% Server is not mounted / or does not care about this hook + %% See emqx_exhook_server:need_call/3 + ignore; {ok, Resp} -> - case AccFun(Req, Resp) of - {stop, NReq} -> - {stop, NReq}; - {ok, NReq} -> - call_fold(Hookpoint, NReq, FailedAction, AccFun, More); - _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + case MergeAccFun(Acc, Resp) of + {stop, NewAcc} -> + ?LOG_CALL_RESULT(ServerName, "'STOP_AND_RETURN'", "Stop hook chain execution"), + {stop, NewAcc}; + {ok, NewAccAsNReq} -> + ?LOG_CALL_RESULT(ServerName, "'CONTINUE'", "Continue calling remaining ExHook servers."), + call_fold(Hookpoint, NewAccAsNReq, FailedAction, MergeAccFun, More); + ignore -> + ?LOG_CALL_RESULT(ServerName, "'IGNORE'", "Continue calling remaining ExHook servers."), + call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More) end; - _ -> + {error, _Reason} -> case FailedAction of deny -> - {stop, deny_action_result(Hookpoint, Req)}; + ?LOG(error, "Call server: ~p for hook: ~p failed. Stop hook chain execution with `request_failed_action=deny`.", + [ServerName, Hookpoint]), + {stop, deny_action_result(Hookpoint, Acc)}; _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More) end end. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 1809d81ee..6ec21e033 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -62,6 +62,9 @@ , call_fold/3 ]). +-define(STOP_OR_OK(Res), + (Res =:= ok orelse Res =:= stop)). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -102,10 +105,11 @@ on_client_authenticate(ClientInfo, AuthResult) -> case call_fold('client.authenticate', Req, fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> + {StopOrOk, #{result := Result0}} + when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) -> Result = case Result0 of true -> success; _ -> not_authorized end, {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}}; - _ -> + ignore -> {ok, AuthResult} end. @@ -122,10 +126,11 @@ on_client_check_acl(ClientInfo, PubSub, Topic, Result) -> }, case call_fold('client.check_acl', Req, fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> + {StopOrOk, #{result := Result0}} + when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) -> NResult = case Result0 of true -> allow; _ -> deny end, {StopOrOk, NResult}; - _ -> {ok, Result} + ignore -> {ok, Result} end. on_client_subscribe(ClientInfo, Props, TopicFilters) -> @@ -190,9 +195,11 @@ on_message_publish(Message) -> Req = #{message => message(Message)}, case call_fold('message.publish', Req, fun emqx_exhook_handler:merge_responsed_message/2) of - {StopOrOk, #{message := NMessage}} -> + {StopOrOk, #{message := NMessage}} + when ?STOP_OR_OK(StopOrOk) -> {StopOrOk, assign_to_message(NMessage, Message)}; - _ -> {ok, Message} + ignore -> + {ok, Message} end. on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 5ce602535..9a8468700 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -240,22 +240,12 @@ name(#server{name = Name}) -> | {error, term()}. call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, hookspec = Hooks, prefix = Prefix}) -> - GrpcFunc = hk2func(Hookpoint), - case maps:get(Hookpoint, Hooks, undefined) of - undefined -> ignore; - Opts -> - NeedCall = case lists:member(Hookpoint, message_hooks()) of - false -> true; - _ -> - #{message := #{topic := Topic}} = Req, - match_topic_filter(Topic, maps:get(topics, Opts, [])) - end, - case NeedCall of - false -> ignore; - _ -> - inc_metrics(Prefix, Hookpoint), - do_call(ChannName, GrpcFunc, Req, ReqOpts) - end + case need_call(Hookpoint, Req, Hooks) of + true -> + inc_metrics(Prefix, Hookpoint), + do_call(ChannName, hk2func(Hookpoint), Req, ReqOpts); + false -> + ignore end. %% @private @@ -266,6 +256,20 @@ inc_metrics(IncFun, Name) when is_function(IncFun) -> inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). +need_call(Hookpoint, Req, Hooks) -> + case maps:get(Hookpoint, Hooks, undefined) of + undefined -> + false; %% Hookpoint is not mounted on this server + Opts -> + case lists:member(Hookpoint, message_hooks()) of + false -> + true; + _ -> + #{message := #{topic := Topic}} = Req, + match_topic_filter(Topic, maps:get(topics, Opts, [])) + end + end. + -compile({inline, [match_topic_filter/2]}). match_topic_filter(_, []) -> true;