Merge pull request #8807 from JimMoen/fix-exhook-ignore-message-hook
Fix exhook ignore message hook
This commit is contained in:
commit
d6ddb85a13
|
@ -14,13 +14,14 @@ File format:
|
|||
|
||||
### Enhancements
|
||||
|
||||
- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654).
|
||||
- Improve error message for LwM2M plugin when object ID is not valid. [#8654](https://github.com/emqx/emqx/pull/8654).
|
||||
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
||||
- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597)
|
||||
- Add node evacuation and cluster rebalancing features. [#8597](https://github.com/emqx/emqx/pull/8597)
|
||||
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
|
||||
- Increases the latency interval for MQTT Bridge test connections to improve compatibility in high-latency environments. [#8745](https://github.com/emqx/emqx/pull/8745)
|
||||
- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||
- Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||
- Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736)
|
||||
- Improved jwt authentication module initialization process. [#8736](https://github.com/emqx/emqx/pull/8736)
|
||||
|
||||
### Bug fixes
|
||||
|
||||
|
@ -30,7 +31,8 @@ File format:
|
|||
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
|
||||
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
|
||||
- Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628)
|
||||
- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](//github.com/emqx/emqx/pull/8725)
|
||||
- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||
- Fix the problem that ExHook cannot continue hook chains execution for mismatched topics. [#8807](https://github.com/emqx/emqx/pull/8807)
|
||||
- Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752)
|
||||
|
||||
|
||||
|
|
|
@ -1,30 +1,31 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{<<"4\\.3\\.[5-6]">>,
|
||||
[{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[5-6]">>,
|
||||
[{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.5",
|
||||
[{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||
{<<".*">>,[]}]}.
|
||||
[{<<"4\\.3\\.[5-6]">>,
|
||||
[{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[5-6]">>,
|
||||
[{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.4",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]},
|
||||
{<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -71,44 +71,59 @@ cast(Hookpoint, Req) ->
|
|||
cast(_, _, []) ->
|
||||
ok;
|
||||
cast(Hookpoint, Req, [ServerName|More]) ->
|
||||
%% XXX: Need a real asynchronous running
|
||||
%% TODO: Need a real asynchronous running
|
||||
_ = emqx_exhook_server:call(Hookpoint, Req,
|
||||
emqx_exhook_mngr:server(ServerName)),
|
||||
cast(Hookpoint, Req, More).
|
||||
|
||||
-spec call_fold(atom(), term(), function())
|
||||
-> {ok, term()}
|
||||
| {stop, term()}.
|
||||
| {stop, term()}
|
||||
| ignore.
|
||||
call_fold(Hookpoint, Req, AccFun) ->
|
||||
FailedAction = emqx_exhook_mngr:get_request_failed_action(),
|
||||
ServerNames = emqx_exhook_mngr:running(),
|
||||
case ServerNames == [] andalso FailedAction == deny of
|
||||
true ->
|
||||
?LOG(warning, "No available Server for hook: ~p . Stop hook chain execution with `request_failed_action=deny`.", [Hookpoint]),
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
_ ->
|
||||
%% `Req` (includede message.. etc.) as `InitAcc` for `emqx_hook`
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames)
|
||||
end.
|
||||
|
||||
call_fold(_, Req, _, _, []) ->
|
||||
{ok, Req};
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
|
||||
-define(LOG_CALL_RESULT(ServerName, Res, Fmt),
|
||||
?LOG(debug, "ExHook server: ~p respond type: ~p. " ++ Fmt, [ServerName, Resp])).
|
||||
|
||||
call_fold(_, Acc, _, _, []) ->
|
||||
{ok, Acc};
|
||||
call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, [ServerName | More]) ->
|
||||
Server = emqx_exhook_mngr:server(ServerName),
|
||||
case emqx_exhook_server:call(Hookpoint, Req, Server) of
|
||||
case emqx_exhook_server:call(Hookpoint, Acc, Server) of
|
||||
ignore ->
|
||||
%% Server is not mounted / or does not care about this hook
|
||||
%% See emqx_exhook_server:need_call/3
|
||||
ignore;
|
||||
{ok, Resp} ->
|
||||
case AccFun(Req, Resp) of
|
||||
{stop, NReq} ->
|
||||
{stop, NReq};
|
||||
{ok, NReq} ->
|
||||
call_fold(Hookpoint, NReq, FailedAction, AccFun, More);
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
case MergeAccFun(Acc, Resp) of
|
||||
{stop, NewAcc} ->
|
||||
?LOG_CALL_RESULT(ServerName, "'STOP_AND_RETURN'", "Stop hook chain execution"),
|
||||
{stop, NewAcc};
|
||||
{ok, NewAccAsNReq} ->
|
||||
?LOG_CALL_RESULT(ServerName, "'CONTINUE'", "Continue calling remaining ExHook servers."),
|
||||
call_fold(Hookpoint, NewAccAsNReq, FailedAction, MergeAccFun, More);
|
||||
ignore ->
|
||||
?LOG_CALL_RESULT(ServerName, "'IGNORE'", "Continue calling remaining ExHook servers."),
|
||||
call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More)
|
||||
end;
|
||||
_ ->
|
||||
{error, _Reason} ->
|
||||
case FailedAction of
|
||||
deny ->
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
?LOG(error, "Call server: ~p for hook: ~p failed. Stop hook chain execution with `request_failed_action=deny`.",
|
||||
[ServerName, Hookpoint]),
|
||||
{stop, deny_action_result(Hookpoint, Acc)};
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More)
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
@ -62,6 +62,9 @@
|
|||
, call_fold/3
|
||||
]).
|
||||
|
||||
-define(STOP_OR_OK(Res),
|
||||
(Res =:= ok orelse Res =:= stop)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Clients
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -102,10 +105,11 @@ on_client_authenticate(ClientInfo, AuthResult) ->
|
|||
|
||||
case call_fold('client.authenticate', Req,
|
||||
fun merge_responsed_bool/2) of
|
||||
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
|
||||
{StopOrOk, #{result := Result0}}
|
||||
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
|
||||
Result = case Result0 of true -> success; _ -> not_authorized end,
|
||||
{StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
|
||||
_ ->
|
||||
ignore ->
|
||||
{ok, AuthResult}
|
||||
end.
|
||||
|
||||
|
@ -122,10 +126,11 @@ on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
|
|||
},
|
||||
case call_fold('client.check_acl', Req,
|
||||
fun merge_responsed_bool/2) of
|
||||
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
|
||||
{StopOrOk, #{result := Result0}}
|
||||
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
|
||||
NResult = case Result0 of true -> allow; _ -> deny end,
|
||||
{StopOrOk, NResult};
|
||||
_ -> {ok, Result}
|
||||
ignore -> {ok, Result}
|
||||
end.
|
||||
|
||||
on_client_subscribe(ClientInfo, Props, TopicFilters) ->
|
||||
|
@ -190,9 +195,11 @@ on_message_publish(Message) ->
|
|||
Req = #{message => message(Message)},
|
||||
case call_fold('message.publish', Req,
|
||||
fun emqx_exhook_handler:merge_responsed_message/2) of
|
||||
{StopOrOk, #{message := NMessage}} ->
|
||||
{StopOrOk, #{message := NMessage}}
|
||||
when ?STOP_OR_OK(StopOrOk) ->
|
||||
{StopOrOk, assign_to_message(NMessage, Message)};
|
||||
_ -> {ok, Message}
|
||||
ignore ->
|
||||
{ok, Message}
|
||||
end.
|
||||
|
||||
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
|
||||
|
|
|
@ -240,22 +240,12 @@ name(#server{name = Name}) ->
|
|||
| {error, term()}.
|
||||
call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
|
||||
hookspec = Hooks, prefix = Prefix}) ->
|
||||
GrpcFunc = hk2func(Hookpoint),
|
||||
case maps:get(Hookpoint, Hooks, undefined) of
|
||||
undefined -> ignore;
|
||||
Opts ->
|
||||
NeedCall = case lists:member(Hookpoint, message_hooks()) of
|
||||
false -> true;
|
||||
_ ->
|
||||
#{message := #{topic := Topic}} = Req,
|
||||
match_topic_filter(Topic, maps:get(topics, Opts, []))
|
||||
end,
|
||||
case NeedCall of
|
||||
false -> ignore;
|
||||
_ ->
|
||||
inc_metrics(Prefix, Hookpoint),
|
||||
do_call(ChannName, GrpcFunc, Req, ReqOpts)
|
||||
end
|
||||
case need_call(Hookpoint, Req, Hooks) of
|
||||
true ->
|
||||
inc_metrics(Prefix, Hookpoint),
|
||||
do_call(ChannName, hk2func(Hookpoint), Req, ReqOpts);
|
||||
false ->
|
||||
ignore
|
||||
end.
|
||||
|
||||
%% @private
|
||||
|
@ -266,6 +256,20 @@ inc_metrics(IncFun, Name) when is_function(IncFun) ->
|
|||
inc_metrics(Prefix, Name) when is_list(Prefix) ->
|
||||
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
|
||||
|
||||
need_call(Hookpoint, Req, Hooks) ->
|
||||
case maps:get(Hookpoint, Hooks, undefined) of
|
||||
undefined ->
|
||||
false; %% Hookpoint is not mounted on this server
|
||||
Opts ->
|
||||
case lists:member(Hookpoint, message_hooks()) of
|
||||
false ->
|
||||
true;
|
||||
_ ->
|
||||
#{message := #{topic := Topic}} = Req,
|
||||
match_topic_filter(Topic, maps:get(topics, Opts, []))
|
||||
end
|
||||
end.
|
||||
|
||||
-compile({inline, [match_topic_filter/2]}).
|
||||
match_topic_filter(_, []) ->
|
||||
true;
|
||||
|
|
|
@ -27,6 +27,12 @@
|
|||
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
|
||||
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
|
||||
|
||||
-define(TEST_PUBLISH_FILTERS_ATOM, test_message_filters).
|
||||
-define(TEST_PUBLISH_FILTERS_STRING, "test_message_filters").
|
||||
|
||||
-define(BEFORE, <<"before_hardcoded">>).
|
||||
-define(AFTER, <<"after_hardcoded">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -136,6 +142,100 @@ t_cluster_name(_) ->
|
|||
[Callback | _] = emqx_hooks:lookup('client.connected'),
|
||||
?assertEqual(1, emqx_hooks:callback_priority(Callback)).
|
||||
|
||||
t_message_topic_filters(_) ->
|
||||
%% ========== Prepare hooks
|
||||
Priority = 2,
|
||||
SetEnvFun =
|
||||
fun(emqx) ->
|
||||
set_special_cfgs(emqx),
|
||||
application:set_env(ekka, cluster_name, ?TEST_PUBLISH_FILTERS_ATOM);
|
||||
(emqx_exhook) ->
|
||||
application:set_env(emqx_exhook, hook_priority, Priority)
|
||||
end,
|
||||
|
||||
emqx_ct_helpers:stop_apps([emqx, emqx_exhook]),
|
||||
emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
|
||||
|
||||
?assertEqual(?TEST_PUBLISH_FILTERS_STRING, emqx_sys:cluster_name()),
|
||||
|
||||
emqx_exhook:disable(default),
|
||||
ok = emqx_exhook:enable(default),
|
||||
%% See emqx_exhook_demo_svr:on_provider_loaded/2
|
||||
|
||||
[Callback | _] = emqx_hooks:lookup('message.publish'),
|
||||
?assertEqual(Priority, emqx_hooks:callback_priority(Callback)),
|
||||
|
||||
%% ========== Test topic filters
|
||||
{ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1),
|
||||
{ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2),
|
||||
|
||||
{ok, _, _} = emqtt:subscribe(C1,[{<<"exhook/hardcoded">>, qos0},
|
||||
{<<"t/1">>, qos0},
|
||||
{<<"t/2">>, qos0},
|
||||
{<<"a/1">>, qos1},
|
||||
{<<"a/2">>, qos2},
|
||||
{<<"b/1">>, qos1},
|
||||
{<<"b/2">>, qos2},
|
||||
{<<"b/3/4">>, qos1},
|
||||
{<<"b/3/4/5">>, qos2}
|
||||
]),
|
||||
|
||||
%% server only handle topic `t/1`, rewrite all topic to `exhook/hardcoded`,
|
||||
%% rewrite all payload to <<"after_hardcoded">>
|
||||
%% See emqx_exhook_demo_svr:on_message_publish/2
|
||||
'test_t/1_topic'(C1, C2),
|
||||
'test_a/#_topic'(C1, C2),
|
||||
'test_b/+_topic'(C1, C2),
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
|
||||
'test_t/1_topic'(_C1, C2) ->
|
||||
ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||
|
||||
ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0),
|
||||
[Msg2 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)).
|
||||
|
||||
|
||||
'test_a/#_topic'(_C1, C2) ->
|
||||
{ok, _} = emqtt:publish(C2, <<"a/1">>, ?BEFORE, 1),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"a/2">>, ?BEFORE, 1),
|
||||
[Msg2 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"a/3/4">>, ?BEFORE, 1),
|
||||
[Msg3 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg3)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"a/3/4/5">>, ?BEFORE, 1),
|
||||
[Msg4 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg4)).
|
||||
|
||||
'test_b/+_topic'(_C1, C2) ->
|
||||
{ok, _} = emqtt:publish(C2, <<"b/1">>, ?BEFORE, 1),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"b/2">>, ?BEFORE, 1),
|
||||
[Msg2 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"b/3/4">>, ?BEFORE, 1),
|
||||
[Msg3 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg3)),
|
||||
|
||||
{ok, _} = emqtt:publish(C2, <<"b/3/4/5">>, ?BEFORE, 2),
|
||||
[Msg4 | _] = receive_messages(1),
|
||||
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg4)).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -170,3 +270,18 @@ meck_print() ->
|
|||
|
||||
unmeck_print() ->
|
||||
meck:unload(emqx_ctl).
|
||||
|
||||
receive_messages(Count) ->
|
||||
receive_messages(Count, []).
|
||||
|
||||
receive_messages(0, Msgs) ->
|
||||
Msgs;
|
||||
receive_messages(Count, Msgs) ->
|
||||
receive
|
||||
{publish, Msg} ->
|
||||
receive_messages(Count-1, [Msg|Msgs]);
|
||||
_Other ->
|
||||
receive_messages(Count, Msgs)
|
||||
after 1000 ->
|
||||
Msgs
|
||||
end.
|
||||
|
|
|
@ -53,6 +53,8 @@
|
|||
-define(NAME, ?MODULE).
|
||||
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
|
||||
-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>).
|
||||
-define(TEST_PUBLISH_FILTERS_BIN, <<"test_message_filters">>).
|
||||
-define(AFTER_HARDCODED_PAYLOAD, <<"after_hardcoded">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Server APIs
|
||||
|
@ -134,16 +136,21 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
|
|||
#{name => <<"session.discarded">>},
|
||||
#{name => <<"session.takeovered">>},
|
||||
#{name => <<"session.terminated">>}],
|
||||
PublishWithFilter =
|
||||
[#{name => <<"message.publish">>, topics => [<<"t/1">>, <<"a/#">>, <<"b/+">>]}],
|
||||
PublishWithOutFilter =
|
||||
[#{name => <<"message.publish">>}],
|
||||
HooksMessage =
|
||||
[#{name => <<"message.publish">>},
|
||||
#{name => <<"message.delivered">>},
|
||||
[#{name => <<"message.delivered">>},
|
||||
#{name => <<"message.acked">>},
|
||||
#{name => <<"message.dropped">>}],
|
||||
case Name of
|
||||
?DEFAULT_CLUSTER_NAME ->
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md};
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithOutFilter ++ HooksMessage}, Md};
|
||||
?OTHER_CLUSTER_NAME_BIN ->
|
||||
{ok, #{hooks => HooksClient}, Md}
|
||||
{ok, #{hooks => HooksClient}, Md};
|
||||
?TEST_PUBLISH_FILTERS_BIN ->
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithFilter ++ HooksMessage}, Md}
|
||||
end.
|
||||
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
||||
|
@ -319,6 +326,12 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
|
|||
payload => From},
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
<<"test_filter_client">> ->
|
||||
%% rewrite topic and payload
|
||||
NMsg = Msg#{topic => <<"exhook/hardcoded">>,
|
||||
payload => ?AFTER_HARDCODED_PAYLOAD},
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
_ ->
|
||||
{ok, #{type => 'IGNORE'}, Md}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue