Merge pull request #7723 from JimMoen/exhook-v2-request-meta

feat(exhook): provide meta data in `RequestMeta`
This commit is contained in:
JianBo He 2022-04-24 09:42:20 +08:00 committed by GitHub
commit 5bb4fcd569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 298 additions and 115 deletions

View File

@ -29,6 +29,7 @@
-export([ -export([
version/0, version/0,
cluster_name/0,
uptime/0, uptime/0,
datetime/0, datetime/0,
sysdescr/0 sysdescr/0
@ -101,6 +102,10 @@ stop() ->
-spec version() -> string(). -spec version() -> string().
version() -> emqx_app:get_release(). version() -> emqx_app:get_release().
%% @doc Get cluster name
-spec cluster_name() -> string().
cluster_name() -> atom_to_list(ekka:cluster_name()).
%% @doc Get sys description %% @doc Get sys description
-spec sysdescr() -> string(). -spec sysdescr() -> string().
sysdescr() -> emqx_app:get_description(). sysdescr() -> emqx_app:get_description().

View File

@ -24,6 +24,4 @@ data/
.DS_Store .DS_Store
*.class *.class
Mnesia.nonode@nohost/ Mnesia.nonode@nohost/
src/emqx_exhook_pb.erl src/pb/*
src/emqx_exhook_v_1_hook_provider_client.erl
src/emqx_exhook_v_1_hook_provider_bhvr.erl

View File

@ -50,7 +50,7 @@
```protobuff ```protobuff
syntax = "proto3"; syntax = "proto3";
package emqx.exhook.v1; package emqx.exhook.v2;
service HookProvider { service HookProvider {

View File

@ -16,13 +16,15 @@
syntax = "proto3"; syntax = "proto3";
option csharp_namespace = "Emqx.Exhook.V1"; option csharp_namespace = "Emqx.Exhook.V2";
option go_package = "emqx.io/grpc/exhook"; option go_package = "emqx.io/grpc/exhook";
option java_multiple_files = true; option java_multiple_files = true;
option java_package = "io.emqx.exhook"; option java_package = "io.emqx.exhook";
option java_outer_classname = "EmqxExHookProto"; option java_outer_classname = "EmqxExHookProto";
package emqx.exhook.v1; // The exhook proto version should be fixed as `v2` in EMQX v5.x
// to make sure the exhook proto version is compatible
package emqx.exhook.v2;
service HookProvider { service HookProvider {
@ -70,21 +72,21 @@ service HookProvider {
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Request & Response // Request
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
message ProviderLoadedRequest { message ProviderLoadedRequest {
BrokerInfo broker = 1; BrokerInfo broker = 1;
RequestMeta meta = 2;
} }
message LoadedResponse { message ProviderUnloadedRequest {
repeated HookSpec hooks = 1; RequestMeta meta = 1;
} }
message ProviderUnloadedRequest { }
message ClientConnectRequest { message ClientConnectRequest {
ConnInfo conninfo = 1; ConnInfo conninfo = 1;
@ -93,6 +95,8 @@ message ClientConnectRequest {
// //
// It should be empty on MQTT v3.1.1/v3.1 or others protocol // It should be empty on MQTT v3.1.1/v3.1 or others protocol
repeated Property props = 2; repeated Property props = 2;
RequestMeta meta = 3;
} }
message ClientConnackRequest { message ClientConnackRequest {
@ -102,11 +106,15 @@ message ClientConnackRequest {
string result_code = 2; string result_code = 2;
repeated Property props = 3; repeated Property props = 3;
RequestMeta meta = 4;
} }
message ClientConnectedRequest { message ClientConnectedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message ClientDisconnectedRequest { message ClientDisconnectedRequest {
@ -114,6 +122,8 @@ message ClientDisconnectedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message ClientAuthenticateRequest { message ClientAuthenticateRequest {
@ -121,6 +131,8 @@ message ClientAuthenticateRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
bool result = 2; bool result = 2;
RequestMeta meta = 3;
} }
message ClientAuthorizeRequest { message ClientAuthorizeRequest {
@ -139,6 +151,8 @@ message ClientAuthorizeRequest {
string topic = 3; string topic = 3;
bool result = 4; bool result = 4;
RequestMeta meta = 5;
} }
message ClientSubscribeRequest { message ClientSubscribeRequest {
@ -148,6 +162,8 @@ message ClientSubscribeRequest {
repeated Property props = 2; repeated Property props = 2;
repeated TopicFilter topic_filters = 3; repeated TopicFilter topic_filters = 3;
RequestMeta meta = 4;
} }
message ClientUnsubscribeRequest { message ClientUnsubscribeRequest {
@ -157,11 +173,15 @@ message ClientUnsubscribeRequest {
repeated Property props = 2; repeated Property props = 2;
repeated TopicFilter topic_filters = 3; repeated TopicFilter topic_filters = 3;
RequestMeta meta = 4;
} }
message SessionCreatedRequest { message SessionCreatedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionSubscribedRequest { message SessionSubscribedRequest {
@ -171,6 +191,8 @@ message SessionSubscribedRequest {
string topic = 2; string topic = 2;
SubOpts subopts = 3; SubOpts subopts = 3;
RequestMeta meta = 4;
} }
message SessionUnsubscribedRequest { message SessionUnsubscribedRequest {
@ -178,21 +200,29 @@ message SessionUnsubscribedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string topic = 2; string topic = 2;
RequestMeta meta = 3;
} }
message SessionResumedRequest { message SessionResumedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionDiscardedRequest { message SessionDiscardedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionTakenoverRequest { message SessionTakenoverRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionTerminatedRequest { message SessionTerminatedRequest {
@ -200,11 +230,15 @@ message SessionTerminatedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message MessagePublishRequest { message MessagePublishRequest {
Message message = 1; Message message = 1;
RequestMeta meta = 2;
} }
message MessageDeliveredRequest { message MessageDeliveredRequest {
@ -212,6 +246,8 @@ message MessageDeliveredRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
Message message = 2; Message message = 2;
RequestMeta meta = 3;
} }
message MessageDroppedRequest { message MessageDroppedRequest {
@ -219,6 +255,8 @@ message MessageDroppedRequest {
Message message = 1; Message message = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message MessageAckedRequest { message MessageAckedRequest {
@ -226,13 +264,22 @@ message MessageAckedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
Message message = 2; Message message = 2;
RequestMeta meta = 3;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Basic data types // Response
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
message EmptySuccess { } // Responsed by `ProviderLoadedRequest`
message LoadedResponse {
repeated HookSpec hooks = 1;
}
// Responsed by `ClientAuthenticateRequest` `ClientAuthorizeRequest` `MessagePublishRequest`
message ValuedResponse { message ValuedResponse {
@ -261,6 +308,14 @@ message ValuedResponse {
} }
} }
// no Response by other Requests
message EmptySuccess { }
//------------------------------------------------------------------------------
// Basic data types
//------------------------------------------------------------------------------
message BrokerInfo { message BrokerInfo {
string version = 1; string version = 1;
@ -272,6 +327,7 @@ message BrokerInfo {
string datetime = 4; string datetime = 4;
} }
message HookSpec { message HookSpec {
// The registered hooks name // The registered hooks name
@ -430,3 +486,14 @@ message SubOpts {
// connection with a ClientID equal to the ClientID of the publishing // connection with a ClientID equal to the ClientID of the publishing
uint32 nl = 5; uint32 nl = 5;
} }
message RequestMeta {
string node = 1;
string version = 2;
string sysdescr = 3;
string cluster_name = 4;
}

View File

@ -13,10 +13,11 @@
{protos, ["priv/protos"]}, {protos, ["priv/protos"]},
{gpb_opts, [ {gpb_opts, [
{module_name_prefix, "emqx_"}, {module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"} {module_name_suffix, "_pb"},
]} {o, "src/pb"}
]},
{out_dir, "src/pb"}
]}. ]}.
{provider_hooks, [ {provider_hooks, [
{pre, [ {pre, [
{compile, {grpc, gen}}, {compile, {grpc, gen}},
@ -50,8 +51,8 @@
{cover_export_enabled, true}. {cover_export_enabled, true}.
{cover_excl_mods, [ {cover_excl_mods, [
emqx_exhook_pb, emqx_exhook_pb,
emqx_exhook_v_1_hook_provider_bhvr, emqx_exhook_v_2_hook_provider_bhvr,
emqx_exhook_v_1_hook_provider_client emqx_exhook_v_2_hook_provider_client
]}. ]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -57,7 +57,8 @@
merge_responsed_bool/2, merge_responsed_bool/2,
merge_responsed_message/2, merge_responsed_message/2,
assign_to_message/2, assign_to_message/2,
clientinfo/1 clientinfo/1,
request_meta/0
]). ]).
-import( -import(
@ -455,3 +456,11 @@ merge_responsed_message(_Req, Resp) ->
ret('CONTINUE') -> ok; ret('CONTINUE') -> ok;
ret('STOP_AND_RETURN') -> stop. ret('STOP_AND_RETURN') -> stop.
request_meta() ->
#{
node => stringfy(node()),
version => emqx_sys:version(),
sysdescr => emqx_sys:sysdescr(),
cluster_name => emqx_sys:cluster_name()
}.

View File

@ -19,7 +19,9 @@
-include("emqx_exhook.hrl"). -include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% The exhook proto version should be fixed as `v2` in EMQX v5.x
%% to make sure the exhook proto version is compatible
-define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client).
%% Load/Unload %% Load/Unload
-export([ -export([
@ -362,14 +364,15 @@ match_topic_filter(TopicName, TopicFilter) ->
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}. -spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName}, Options = ReqOpts#{channel => ChannName},
NReq = Req#{meta => emqx_exhook_handler:request_meta()},
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "do_call", msg => "do_call",
module => ?PB_CLIENT_MOD, module => ?PB_CLIENT_MOD,
function => Fun, function => Fun,
req => Req, req => NReq,
options => Options options => Options
}), }),
case catch ?CALL_PB_CLIENT(ChanneName, Fun, Req, Options) of case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of
{ok, Resp, Metadata} -> {ok, Resp, Metadata} ->
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}), ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2), update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
@ -379,7 +382,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
msg => "exhook_call_error", msg => "exhook_call_error",
module => ?PB_CLIENT_MOD, module => ?PB_CLIENT_MOD,
function => Fun, function => Fun,
req => Req, req => NReq,
options => Options, options => Options,
code => Code, code => Code,
packet => Msg packet => Msg
@ -391,7 +394,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
msg => "exhook_call_error", msg => "exhook_call_error",
module => ?PB_CLIENT_MOD, module => ?PB_CLIENT_MOD,
function => Fun, function => Fun,
req => Req, req => NReq,
options => Options, options => Options,
reason => Reason reason => Reason
}), }),
@ -402,7 +405,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
msg => "exhook_call_exception", msg => "exhook_call_exception",
module => ?PB_CLIENT_MOD, module => ?PB_CLIENT_MOD,
function => Fun, function => Fun,
req => Req, req => NReq,
options => Options, options => Options,
stacktrace => Stk stacktrace => Stk
}), }),

View File

@ -19,8 +19,15 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_exhook.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, << -define(CONF_DEFAULT, <<
@ -54,6 +61,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
application:load(emqx_conf), application:load(emqx_conf),
ok = ekka:start(), ok = ekka:start(),
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, activate, 3, ok),
@ -65,6 +73,7 @@ init_per_suite(Cfg) ->
Cfg. Cfg.
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ekka:stop(), ekka:stop(),
mria:stop(), mria:stop(),
mria_mnesia:delete_schema(), mria_mnesia:delete_schema(),
@ -95,7 +104,7 @@ load_cfg(Cfg) ->
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_access_failed_if_no_server_running(_) -> t_access_failed_if_no_server_running(Config) ->
emqx_exhook_mgr:disable(<<"default">>), emqx_exhook_mgr:disable(<<"default">>),
ClientInfo = #{ ClientInfo = #{
clientid => <<"user-id-1">>, clientid => <<"user-id-1">>,
@ -120,7 +129,8 @@ t_access_failed_if_no_server_running(_) ->
{stop, Message}, {stop, Message},
emqx_exhook_handler:on_message_publish(Message) emqx_exhook_handler:on_message_publish(Message)
), ),
emqx_exhook_mgr:enable(<<"default">>). emqx_exhook_mgr:enable(<<"default">>),
assert_get_basic_usage_info(Config).
t_lookup(_) -> t_lookup(_) ->
Result = emqx_exhook_mgr:lookup(<<"default">>), Result = emqx_exhook_mgr:lookup(<<"default">>),
@ -250,7 +260,36 @@ t_misc_test(_) ->
_ = emqx_exhook_server:format(#{name => <<"test">>, hookspec => #{}}), _ = emqx_exhook_server:format(#{name => <<"test">>, hookspec => #{}}),
ok. ok.
t_get_basic_usage_info(_Config) -> t_cluster_name(_) ->
SetEnvFun =
fun
(emqx) ->
application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM);
(emqx_exhook) ->
ok
end,
emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]),
emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()),
emqx_exhook_mgr:disable(<<"default">>),
emqx_exhook_mgr:enable(<<"default">>),
%% See emqx_exhook_demo_svr:on_provider_loaded/2
?assertEqual([], emqx_hooks:lookup('session.created')),
?assertEqual([], emqx_hooks:lookup('message_publish')),
?assertEqual(
true,
erlang:length(emqx_hooks:lookup('client.connected')) > 1
),
emqx_exhook_mgr:disable(<<"default">>).
%%--------------------------------------------------------------------
%% Cases Helpers
%%--------------------------------------------------------------------
assert_get_basic_usage_info(_Config) ->
#{ #{
num_servers := NumServers, num_servers := NumServers,
servers := Servers servers := Servers

View File

@ -26,6 +26,8 @@
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
-define(CONF_DEFAULT, << -define(CONF_DEFAULT, <<
"\n" "\n"
"exhook {\n" "exhook {\n"
@ -54,18 +56,20 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), application:load(emqx_conf),
ok = ekka:start(), ok = ekka:start(),
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok),
_ = emqx_exhook_demo_svr:start(), _ = emqx_exhook_demo_svr:start(),
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, ?CONF_DEFAULT), load_cfg(?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_exhook]), emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
[Conf] = emqx:get_config([exhook, servers]), [Conf] = emqx:get_config([exhook, servers]),
[{template, Conf} | Config]. [{template, Conf} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
ekka:stop(), ekka:stop(),
mria:stop(), mria:stop(),
mria_mnesia:delete_schema(), mria_mnesia:delete_schema(),
@ -96,6 +100,13 @@ end_per_testcase(_, Config) ->
end, end,
Config. Config.
load_cfg(Cfg) ->
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, Cfg).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_list(_) -> t_list(_) ->
{ok, Data} = request_api( {ok, Data} = request_api(
get, get,

View File

@ -16,7 +16,7 @@
-module(emqx_exhook_demo_svr). -module(emqx_exhook_demo_svr).
-behaviour(emqx_exhook_v_1_hook_provider_bhvr). -behaviour(emqx_exhook_v_2_hook_provider_bhvr).
%% %%
-export([ -export([
@ -55,6 +55,8 @@
-define(PORT, 9000). -define(PORT, 9000).
-define(NAME, ?MODULE). -define(NAME, ?MODULE).
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Server APIs %% Server APIs
@ -89,7 +91,7 @@ mgr_main(Name, Port) ->
application:ensure_all_started(grpc), application:ensure_all_started(grpc),
Services = #{ Services = #{
protos => [emqx_exhook_pb], protos => [emqx_exhook_pb],
services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} services => #{'emqx.exhook.v2.HookProvider' => emqx_exhook_demo_svr}
}, },
Options = [], Options = [],
Svr = grpc:start_server(Name, Port, Services, Options), Svr = grpc:start_server(Name, Port, Services, Options),
@ -134,34 +136,43 @@ to_atom_name(Name) ->
{ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.
on_provider_loaded(Req, Md) -> on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}), ?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), %% io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, HooksClient =
#{ [
hooks => [ #{name => <<"client.connect">>},
#{name => <<"client.connect">>}, #{name => <<"client.connack">>},
#{name => <<"client.connack">>}, #{name => <<"client.connected">>},
#{name => <<"client.connected">>}, #{name => <<"client.disconnected">>},
#{name => <<"client.disconnected">>}, #{name => <<"client.authenticate">>},
#{name => <<"client.authenticate">>}, #{name => <<"client.authorize">>},
#{name => <<"client.authorize">>}, #{name => <<"client.subscribe">>},
#{name => <<"client.subscribe">>}, #{name => <<"client.unsubscribe">>}
#{name => <<"client.unsubscribe">>}, ],
#{name => <<"session.created">>}, HooksSession =
#{name => <<"session.subscribed">>}, [
#{name => <<"session.unsubscribed">>}, #{name => <<"session.created">>},
#{name => <<"session.resumed">>}, #{name => <<"session.subscribed">>},
#{name => <<"session.discarded">>}, #{name => <<"session.unsubscribed">>},
#{name => <<"session.takenover">>}, #{name => <<"session.resumed">>},
#{name => <<"session.terminated">>}, #{name => <<"session.discarded">>},
#{name => <<"message.publish">>}, #{name => <<"session.takenover">>},
#{name => <<"message.delivered">>}, #{name => <<"session.terminated">>}
#{name => <<"message.acked">>}, ],
#{name => <<"message.dropped">>} HooksMessage =
] [
}, #{name => <<"message.publish">>},
Md}. #{name => <<"message.delivered">>},
#{name => <<"message.acked">>},
#{name => <<"message.dropped">>}
],
case Name of
?DEFAULT_CLUSTER_NAME ->
{ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md};
?OTHER_CLUSTER_NAME_BIN ->
{ok, #{hooks => HooksClient}, Md}
end.
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()} {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.

View File

@ -53,6 +53,8 @@
?FORALL(Vars, Types, Exprs) ?FORALL(Vars, Types, Exprs)
) )
). ).
-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
-define(DEFAULT_CLUSTER_NAME_BIN, <<"emqxcl">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Properties %% Properties
@ -60,15 +62,16 @@
prop_client_connect() -> prop_client_connect() ->
?ALL( ?ALL(
{ConnInfo, ConnProps}, {ConnInfo, ConnProps, Meta},
{conninfo(), conn_properties()}, {conninfo(), conn_properties(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{ #{
props => properties(ConnProps), props => properties(ConnProps),
conninfo => from_conninfo(ConnInfo) conninfo => from_conninfo(ConnInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -77,8 +80,8 @@ prop_client_connect() ->
prop_client_connack() -> prop_client_connack() ->
?ALL( ?ALL(
{ConnInfo, Rc, AckProps}, {ConnInfo, Rc, AckProps, Meta},
{conninfo(), connack_return_code(), ack_properties()}, {conninfo(), connack_return_code(), ack_properties(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
{'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
@ -86,7 +89,8 @@ prop_client_connack() ->
#{ #{
props => properties(AckProps), props => properties(AckProps),
result_code => atom_to_binary(Rc, utf8), result_code => atom_to_binary(Rc, utf8),
conninfo => from_conninfo(ConnInfo) conninfo => from_conninfo(ConnInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -95,8 +99,8 @@ prop_client_connack() ->
prop_client_authenticate() -> prop_client_authenticate() ->
?ALL( ?ALL(
{ClientInfo0, AuthResult}, {ClientInfo0, AuthResult, Meta},
{clientinfo(), authresult()}, {clientinfo(), authresult(), request_meta()},
begin begin
ClientInfo = inject_magic_into(username, ClientInfo0), ClientInfo = inject_magic_into(username, ClientInfo0),
OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
@ -120,7 +124,8 @@ prop_client_authenticate() ->
Expected = Expected =
#{ #{
result => authresult_to_bool(AuthResult), result => authresult_to_bool(AuthResult),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -129,8 +134,8 @@ prop_client_authenticate() ->
prop_client_authorize() -> prop_client_authorize() ->
?ALL( ?ALL(
{ClientInfo0, PubSub, Topic, Result}, {ClientInfo0, PubSub, Topic, Result, Meta},
{clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])}, {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()},
begin begin
ClientInfo = inject_magic_into(username, ClientInfo0), ClientInfo = inject_magic_into(username, ClientInfo0),
OutResult = emqx_hooks:run_fold( OutResult = emqx_hooks:run_fold(
@ -153,7 +158,8 @@ prop_client_authorize() ->
result => aclresult_to_bool(Result), result => aclresult_to_bool(Result),
type => pubsub_to_enum(PubSub), type => pubsub_to_enum(PubSub),
topic => Topic, topic => Topic,
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -162,13 +168,16 @@ prop_client_authorize() ->
prop_client_connected() -> prop_client_connected() ->
?ALL( ?ALL(
{ClientInfo, ConnInfo}, {ClientInfo, ConnInfo, Meta},
{clientinfo(), conninfo()}, {clientinfo(), conninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
{'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo)}, #{
clientinfo => from_clientinfo(ClientInfo),
meta => Meta
},
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end end
@ -176,15 +185,16 @@ prop_client_connected() ->
prop_client_disconnected() -> prop_client_disconnected() ->
?ALL( ?ALL(
{ClientInfo, Reason, ConnInfo}, {ClientInfo, Reason, ConnInfo, Meta},
{clientinfo(), shutdown_reason(), conninfo()}, {clientinfo(), shutdown_reason(), conninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
{'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{ #{
reason => stringfy(Reason), reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -193,8 +203,8 @@ prop_client_disconnected() ->
prop_client_subscribe() -> prop_client_subscribe() ->
?ALL( ?ALL(
{ClientInfo, SubProps, TopicTab}, {ClientInfo, SubProps, TopicTab, Meta},
{clientinfo(), sub_properties(), topictab()}, {clientinfo(), sub_properties(), topictab(), request_meta()},
begin begin
ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
{'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
@ -202,7 +212,8 @@ prop_client_subscribe() ->
#{ #{
props => properties(SubProps), props => properties(SubProps),
topic_filters => topicfilters(TopicTab), topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -211,8 +222,8 @@ prop_client_subscribe() ->
prop_client_unsubscribe() -> prop_client_unsubscribe() ->
?ALL( ?ALL(
{ClientInfo, UnSubProps, TopicTab}, {ClientInfo, UnSubProps, TopicTab, Meta},
{clientinfo(), unsub_properties(), topictab()}, {clientinfo(), unsub_properties(), topictab(), request_meta()},
begin begin
ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
{'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
@ -220,7 +231,8 @@ prop_client_unsubscribe() ->
#{ #{
props => properties(UnSubProps), props => properties(UnSubProps),
topic_filters => topicfilters(TopicTab), topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -229,13 +241,16 @@ prop_client_unsubscribe() ->
prop_session_created() -> prop_session_created() ->
?ALL( ?ALL(
{ClientInfo, SessInfo}, {ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo()}, {clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
{'on_session_created', Resp} = emqx_exhook_demo_svr:take(), {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo)}, #{
clientinfo => from_clientinfo(ClientInfo),
meta => Meta
},
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end end
@ -243,8 +258,8 @@ prop_session_created() ->
prop_session_subscribed() -> prop_session_subscribed() ->
?ALL( ?ALL(
{ClientInfo, Topic, SubOpts}, {ClientInfo, Topic, SubOpts, Meta},
{clientinfo(), topic(), subopts()}, {clientinfo(), topic(), subopts(), request_meta()},
begin begin
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
@ -252,7 +267,8 @@ prop_session_subscribed() ->
#{ #{
topic => Topic, topic => Topic,
subopts => subopts(SubOpts), subopts => subopts(SubOpts),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -261,15 +277,16 @@ prop_session_subscribed() ->
prop_session_unsubscribed() -> prop_session_unsubscribed() ->
?ALL( ?ALL(
{ClientInfo, Topic, SubOpts}, {ClientInfo, Topic, SubOpts, Meta},
{clientinfo(), topic(), subopts()}, {clientinfo(), topic(), subopts(), request_meta()},
begin begin
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{ #{
topic => Topic, topic => Topic,
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -278,13 +295,16 @@ prop_session_unsubscribed() ->
prop_session_resumed() -> prop_session_resumed() ->
?ALL( ?ALL(
{ClientInfo, SessInfo}, {ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo()}, {clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
{'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo)}, #{
clientinfo => from_clientinfo(ClientInfo),
meta => Meta
},
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end end
@ -292,13 +312,13 @@ prop_session_resumed() ->
prop_session_discared() -> prop_session_discared() ->
?ALL( ?ALL(
{ClientInfo, SessInfo}, {ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo()}, {clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
{'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo)}, #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end end
@ -306,13 +326,13 @@ prop_session_discared() ->
prop_session_takenover() -> prop_session_takenover() ->
?ALL( ?ALL(
{ClientInfo, SessInfo}, {ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo()}, {clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]),
{'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(), {'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo)}, #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end end
@ -320,15 +340,16 @@ prop_session_takenover() ->
prop_session_terminated() -> prop_session_terminated() ->
?ALL( ?ALL(
{ClientInfo, Reason, SessInfo}, {ClientInfo, Reason, SessInfo, Meta},
{clientinfo(), shutdown_reason(), sessioninfo()}, {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
{'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{ #{
reason => stringfy(Reason), reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
@ -337,8 +358,8 @@ prop_session_terminated() ->
prop_message_publish() -> prop_message_publish() ->
?ALL( ?ALL(
Msg0, {Msg0, Meta},
message(), {message(), request_meta()},
begin begin
Msg = emqx_message:from_map( Msg = emqx_message:from_map(
inject_magic_into(from, emqx_message:to_map(Msg0)) inject_magic_into(from, emqx_message:to_map(Msg0))
@ -381,7 +402,10 @@ prop_message_publish() ->
{'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{message => from_message(Msg)}, #{
message => from_message(Msg),
meta => Meta
},
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
true true
@ -390,8 +414,8 @@ prop_message_publish() ->
prop_message_dropped() -> prop_message_dropped() ->
?ALL( ?ALL(
{Msg, By, Reason}, {Msg, By, Reason, Meta},
{message(), hardcoded, shutdown_reason()}, {message(), hardcoded, shutdown_reason(), request_meta()},
begin begin
ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -402,7 +426,8 @@ prop_message_dropped() ->
Expected = Expected =
#{ #{
reason => stringfy(Reason), reason => stringfy(Reason),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -412,8 +437,8 @@ prop_message_dropped() ->
prop_message_delivered() -> prop_message_delivered() ->
?ALL( ?ALL(
{ClientInfo, Msg}, {ClientInfo, Msg, Meta},
{clientinfo(), message()}, {clientinfo(), message(), request_meta()},
begin begin
ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -424,7 +449,8 @@ prop_message_delivered() ->
Expected = Expected =
#{ #{
clientinfo => from_clientinfo(ClientInfo), clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -434,8 +460,8 @@ prop_message_delivered() ->
prop_message_acked() -> prop_message_acked() ->
?ALL( ?ALL(
{ClientInfo, Msg}, {ClientInfo, Msg, Meta},
{clientinfo(), message()}, {clientinfo(), message(), request_meta()},
begin begin
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -446,7 +472,8 @@ prop_message_acked() ->
Expected = Expected =
#{ #{
clientinfo => from_clientinfo(ClientInfo), clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -500,6 +527,8 @@ stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term); integer_to_binary(Term);
stringfy(Term) when is_atom(Term) -> stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8); atom_to_binary(Term, utf8);
stringfy(Term) when is_list(Term) ->
list_to_binary(Term);
stringfy(Term) -> stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))). unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
@ -569,6 +598,8 @@ from_message(Msg) ->
do_setup() -> do_setup() ->
logger:set_primary_config(#{level => warning}), logger:set_primary_config(#{level => warning}),
ok = ekka:start(),
application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
_ = emqx_exhook_demo_svr:start(), _ = emqx_exhook_demo_svr:start(),
ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_exhook]), emqx_common_test_helpers:start_apps([emqx_exhook]),
@ -623,3 +654,11 @@ inject_magic_into(Key, Object) ->
castspell() -> castspell() ->
L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles], L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
lists:nth(rand:uniform(length(L)), L). lists:nth(rand:uniform(length(L)), L).
request_meta() ->
#{
node => nodestr(),
version => stringfy(emqx_sys:version()),
sysdescr => stringfy(emqx_sys:sysdescr()),
cluster_name => ?DEFAULT_CLUSTER_NAME_BIN
}.