Merge pull request #7524 from JimMoen/exhook-cluster-name

Exhook provide `request_meta` to distinguish `cluster_name`
This commit is contained in:
JianBo He 2022-04-07 19:21:51 +08:00 committed by GitHub
commit 2b28b5f8c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 279 additions and 107 deletions

View File

@ -14,8 +14,10 @@ File format:
### Enhancements ### Enhancements
* ExHook add a new field named `meta` to provide emqx `cluster_name`.
The grpc server can handle calls separately for different clusters. [#7524]
* In order to fix the execution order of exhook, e.g. before/after other plugins/modules, * In order to fix the execution order of exhook, e.g. before/after other plugins/modules,
ExHook now supports user customizing emqx_hook execute priority. ExHook now supports user customizing emqx_hook execute priority. [#7408]
* add api: PUT /rules/{id}/reset_metrics. * add api: PUT /rules/{id}/reset_metrics.
This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474] This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474]
* Enhanced rule engine error handling when json parsing error. * Enhanced rule engine error handling when json parsing error.

View File

@ -25,5 +25,5 @@ data/
*.class *.class
Mnesia.nonode@nohost/ Mnesia.nonode@nohost/
src/emqx_exhook_pb.erl src/emqx_exhook_pb.erl
src/emqx_exhook_v_1_hook_provider_client.erl src/emqx_exhook_v_*_hook_provider_client.erl
src/emqx_exhook_v_1_hook_provider_bhvr.erl src/emqx_exhook_v_*_hook_provider_bhvr.erl

View File

@ -22,6 +22,7 @@ option java_multiple_files = true;
option java_package = "io.emqx.exhook"; option java_package = "io.emqx.exhook";
option java_outer_classname = "EmqxExHookProto"; option java_outer_classname = "EmqxExHookProto";
// Proto package compatible, Don't need an updated version..
package emqx.exhook.v1; package emqx.exhook.v1;
service HookProvider { service HookProvider {
@ -76,6 +77,8 @@ service HookProvider {
message ProviderLoadedRequest { message ProviderLoadedRequest {
BrokerInfo broker = 1; BrokerInfo broker = 1;
RequestMeta meta = 2;
} }
message LoadedResponse { message LoadedResponse {
@ -83,7 +86,11 @@ message LoadedResponse {
repeated HookSpec hooks = 1; repeated HookSpec hooks = 1;
} }
message ProviderUnloadedRequest { } message ProviderUnloadedRequest {
RequestMeta meta = 1;
}
message ClientConnectRequest { message ClientConnectRequest {
@ -93,6 +100,8 @@ message ClientConnectRequest {
// //
// It should be empty on MQTT v3.1.1/v3.1 or others protocol // It should be empty on MQTT v3.1.1/v3.1 or others protocol
repeated Property props = 2; repeated Property props = 2;
RequestMeta meta = 3;
} }
message ClientConnackRequest { message ClientConnackRequest {
@ -102,11 +111,15 @@ message ClientConnackRequest {
string result_code = 2; string result_code = 2;
repeated Property props = 3; repeated Property props = 3;
RequestMeta meta = 4;
} }
message ClientConnectedRequest { message ClientConnectedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message ClientDisconnectedRequest { message ClientDisconnectedRequest {
@ -114,6 +127,8 @@ message ClientDisconnectedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message ClientAuthenticateRequest { message ClientAuthenticateRequest {
@ -121,6 +136,8 @@ message ClientAuthenticateRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
bool result = 2; bool result = 2;
RequestMeta meta = 3;
} }
message ClientCheckAclRequest { message ClientCheckAclRequest {
@ -139,6 +156,8 @@ message ClientCheckAclRequest {
string topic = 3; string topic = 3;
bool result = 4; bool result = 4;
RequestMeta meta = 5;
} }
message ClientSubscribeRequest { message ClientSubscribeRequest {
@ -148,6 +167,8 @@ message ClientSubscribeRequest {
repeated Property props = 2; repeated Property props = 2;
repeated TopicFilter topic_filters = 3; repeated TopicFilter topic_filters = 3;
RequestMeta meta = 4;
} }
message ClientUnsubscribeRequest { message ClientUnsubscribeRequest {
@ -157,11 +178,15 @@ message ClientUnsubscribeRequest {
repeated Property props = 2; repeated Property props = 2;
repeated TopicFilter topic_filters = 3; repeated TopicFilter topic_filters = 3;
RequestMeta meta = 4;
} }
message SessionCreatedRequest { message SessionCreatedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionSubscribedRequest { message SessionSubscribedRequest {
@ -171,6 +196,8 @@ message SessionSubscribedRequest {
string topic = 2; string topic = 2;
SubOpts subopts = 3; SubOpts subopts = 3;
RequestMeta meta = 4;
} }
message SessionUnsubscribedRequest { message SessionUnsubscribedRequest {
@ -178,21 +205,29 @@ message SessionUnsubscribedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string topic = 2; string topic = 2;
RequestMeta meta = 3;
} }
message SessionResumedRequest { message SessionResumedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionDiscardedRequest { message SessionDiscardedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionTakeoveredRequest { message SessionTakeoveredRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
RequestMeta meta = 2;
} }
message SessionTerminatedRequest { message SessionTerminatedRequest {
@ -200,11 +235,15 @@ message SessionTerminatedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message MessagePublishRequest { message MessagePublishRequest {
Message message = 1; Message message = 1;
RequestMeta meta = 2;
} }
message MessageDeliveredRequest { message MessageDeliveredRequest {
@ -212,6 +251,8 @@ message MessageDeliveredRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
Message message = 2; Message message = 2;
RequestMeta meta = 3;
} }
message MessageDroppedRequest { message MessageDroppedRequest {
@ -219,6 +260,8 @@ message MessageDroppedRequest {
Message message = 1; Message message = 1;
string reason = 2; string reason = 2;
RequestMeta meta = 3;
} }
message MessageAckedRequest { message MessageAckedRequest {
@ -226,6 +269,8 @@ message MessageAckedRequest {
ClientInfo clientinfo = 1; ClientInfo clientinfo = 1;
Message message = 2; Message message = 2;
RequestMeta meta = 3;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -405,3 +450,14 @@ message SubOpts {
// connection with a ClientID equal to the ClientID of the publishing // connection with a ClientID equal to the ClientID of the publishing
uint32 nl = 5; uint32 nl = 5;
} }
message RequestMeta {
string node = 1;
string version = 2;
string sysdescr = 3;
string cluster_name = 4;
}

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_exhook, {application, emqx_exhook,
[{description, "EMQ X Extension for Hook"}, [{description, "EMQ X Extension for Hook"},
{vsn, "4.3.5"}, {vsn, "4.3.5"},

View File

@ -4,6 +4,9 @@
{"4.3.4", [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, {load_module, emqx_exhook_server, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_handler, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook, brutal_purge, soft_purge, []},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]}, ]},
{<<"4\\.3\\.[0-3]">>, [ {<<"4\\.3\\.[0-3]">>, [
@ -15,6 +18,9 @@
{"4.3.4", [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, {load_module, emqx_exhook_server, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_handler, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook, brutal_purge, soft_purge, []},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]}, ]},
{<<"4\\.3\\.[0-3]">>, [ {<<"4\\.3\\.[0-3]">>, [

View File

@ -30,6 +30,11 @@
, call_fold/3 , call_fold/3
]). ]).
-export([request_meta/0]).
-import(emqx_exhook_handler, [stringfy/1]).
%% TODO: move util functions to an independent module
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mgmt APIs %% Mgmt APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -116,3 +121,10 @@ deny_action_result('message.publish', Msg) ->
%% TODO: Not support to deny a message %% TODO: Not support to deny a message
%% maybe we can put the 'allow_publish' into message header %% maybe we can put the 'allow_publish' into message header
Msg. Msg.
request_meta() ->
#{ node => stringfy(node())
, version => emqx_sys:version()
, sysdescr => emqx_sys:sysdescr()
, cluster_name => emqx_sys:cluster_name()
}.

View File

@ -288,6 +288,8 @@ stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term); integer_to_binary(Term);
stringfy(Term) when is_atom(Term) -> stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8); atom_to_binary(Term, utf8);
stringfy(Term) when is_list(Term) ->
list_to_binary(Term);
stringfy(Term) -> stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))). unicode:characters_to_binary((io_lib:format("~0p", [Term]))).

View File

@ -269,23 +269,24 @@ match_topic_filter(TopicName, TopicFilter) ->
-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) -> do_call(ChannName, Fun, Req, ReqOpts) ->
NReq = Req#{meta => emqx_exhook:request_meta()},
Options = ReqOpts#{channel => ChannName}, Options = ReqOpts#{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of
{ok, Resp, _Metadata} -> {ok, Resp, _Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
{ok, Resp}; {ok, Resp};
{error, {Code, Msg}, _Metadata} -> {error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]), [?PB_CLIENT_MOD, Fun, NReq, Options, Code, Msg]),
{error, {Code, Msg}}; {error, {Code, Msg}};
{error, Reason} -> {error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason]), [?PB_CLIENT_MOD, Fun, NReq, Options, Reason]),
{error, Reason}; {error, Reason};
{'EXIT', {Reason, Stk}} -> {'EXIT', {Reason, Stk}} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]), [?PB_CLIENT_MOD, Fun, NReq, Options, Reason, Stk]),
{error, Reason} {error, Reason}
end. end.

View File

@ -19,9 +19,14 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_exhook.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -99,29 +104,63 @@ t_cli_stats(_) ->
unmeck_print(). unmeck_print().
t_priority(_) -> t_priority(_) ->
restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]), restart_apps_with_envs([ {emqx, fun set_special_cfgs/1}
, {emqx_exhook, [{hook_priority, 1}]}]),
emqx_exhook:disable(default), emqx_exhook:disable(default),
ok = emqx_exhook:enable(default), ok = emqx_exhook:enable(default),
[Callback | _] = emqx_hooks:lookup('client.connected'), [Callback | _] = emqx_hooks:lookup('client.connected'),
1 = emqx_hooks:callback_priority(Callback). ?assertEqual(1, emqx_hooks:callback_priority(Callback)).
t_cluster_name(_) ->
SetEnvFun =
fun(emqx) ->
set_special_cfgs(emqx),
application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM);
(emqx_exhook) ->
application:set_env(emqx_exhook, hook_priority, 1)
end,
emqx_ct_helpers:stop_apps([emqx, emqx_exhook]),
emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()),
emqx_exhook:disable(default),
ok = emqx_exhook:enable(default),
%% See emqx_exhook_demo_svr:on_provider_loaded/2
?assertEqual([], emqx_hooks:lookup('session.created')),
?assertEqual([], emqx_hooks:lookup('message_publish')),
[Callback | _] = emqx_hooks:lookup('client.connected'),
?assertEqual(1, emqx_hooks:callback_priority(Callback)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% TODO: make it more general and move to `emqx_ct_helpers` %% TODO: make it more general and move to `emqx_ct_helpers`
restart_exhook_with_envs(Envs) -> restart_app_with_envs(App, Fun)
emqx_ct_helpers:stop_apps([emqx_exhook]), when is_function(Fun) ->
SetPriorityFun emqx_ct_helpers:stop_apps([App]),
= fun(emqx) -> emqx_ct_helpers:start_apps([App], Fun);
set_special_cfgs(emqx);
(emqx_exhook) -> restart_app_with_envs(App, Envs)
lists:foreach(fun({App, Key, Val}) -> when is_list(Envs) ->
emqx_ct_helpers:stop_apps([App]),
HandlerFun =
fun(App) ->
lists:foreach(fun({Key, Val}) ->
application:set_env(App, Key, Val) application:set_env(App, Key, Val)
end, Envs) end, Envs)
end, end,
emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun). emqx_ct_helpers:start_apps([App], HandlerFun).
restart_apps_with_envs([]) ->
ok;
restart_apps_with_envs([{App, Envs} | Rest]) ->
restart_app_with_envs(App, Envs),
restart_apps_with_envs(Rest).
meck_print() -> meck_print() ->
meck:new(emqx_ctl, [passthrough, no_history, no_link]), meck:new(emqx_ctl, [passthrough, no_history, no_link]),

View File

@ -51,6 +51,8 @@
-define(PORT, 9000). -define(PORT, 9000).
-define(NAME, ?MODULE). -define(NAME, ?MODULE).
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Server APIs %% Server APIs
@ -112,30 +114,37 @@ reply(Q1, Q2) ->
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.
on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
on_provider_loaded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}), ?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{hooks => [ HooksClient =
#{name => <<"client.connect">>}, [#{name => <<"client.connect">>},
#{name => <<"client.connack">>}, #{name => <<"client.connack">>},
#{name => <<"client.connected">>}, #{name => <<"client.connected">>},
#{name => <<"client.disconnected">>}, #{name => <<"client.disconnected">>},
#{name => <<"client.authenticate">>}, #{name => <<"client.authenticate">>},
#{name => <<"client.check_acl">>}, #{name => <<"client.check_acl">>},
#{name => <<"client.subscribe">>}, #{name => <<"client.subscribe">>},
#{name => <<"client.unsubscribe">>}, #{name => <<"client.unsubscribe">>}],
#{name => <<"session.created">>}, HooksSession =
[#{name => <<"session.created">>},
#{name => <<"session.subscribed">>}, #{name => <<"session.subscribed">>},
#{name => <<"session.unsubscribed">>}, #{name => <<"session.unsubscribed">>},
#{name => <<"session.resumed">>}, #{name => <<"session.resumed">>},
#{name => <<"session.discarded">>}, #{name => <<"session.discarded">>},
#{name => <<"session.takeovered">>}, #{name => <<"session.takeovered">>},
#{name => <<"session.terminated">>}, #{name => <<"session.terminated">>}],
#{name => <<"message.publish">>}, HooksMessage =
[#{name => <<"message.publish">>},
#{name => <<"message.delivered">>}, #{name => <<"message.delivered">>},
#{name => <<"message.acked">>}, #{name => <<"message.acked">>},
#{name => <<"message.dropped">>}]}, Md}. #{name => <<"message.dropped">>}],
case Name of
?DEFAULT_CLUSTER_NAME ->
{ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md};
?OTHER_CLUSTER_NAME_BIN ->
{ok, #{hooks => HooksClient}, Md}
end.
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}. | {error, grpc_cowboy_h:error_response()}.

View File

@ -36,42 +36,46 @@
fun() -> do_teardown(State) end fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))). end, ?FORALL(Vars, Types, Exprs))).
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Properties %% Properties
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
prop_client_connect() -> prop_client_connect() ->
?ALL({ConnInfo, ConnProps}, ?ALL({ConnInfo, ConnProps, Meta},
{conninfo(), conn_properties()}, {conninfo(), conn_properties(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{props => properties(ConnProps), #{props => properties(ConnProps),
conninfo => from_conninfo(ConnInfo) conninfo => from_conninfo(ConnInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_connack() -> prop_client_connack() ->
?ALL({ConnInfo, Rc, AckProps}, ?ALL({ConnInfo, Rc, AckProps, Meta},
{conninfo(), connack_return_code(), ack_properties()}, {conninfo(), connack_return_code(), ack_properties(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
{'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{props => properties(AckProps), #{props => properties(AckProps),
result_code => atom_to_binary(Rc, utf8), result_code => atom_to_binary(Rc, utf8),
conninfo => from_conninfo(ConnInfo) conninfo => from_conninfo(ConnInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_authenticate() -> prop_client_authenticate() ->
?ALL({ClientInfo0, AuthResult}, ?ALL({ClientInfo0, AuthResult, Meta},
{clientinfo(), authresult()}, {clientinfo(), authresult(), request_meta()},
begin begin
ClientInfo = inject_magic_into(username, ClientInfo0), ClientInfo = inject_magic_into(username, ClientInfo0),
OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
@ -103,16 +107,16 @@ prop_client_authenticate() ->
{'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{result => authresult_to_bool(AuthResult), #{result => authresult_to_bool(AuthResult),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_check_acl() -> prop_client_check_acl() ->
?ALL({ClientInfo0, PubSub, Topic, Result}, ?ALL({ClientInfo0, PubSub, Topic, Result, Meta},
{clientinfo(), oneof([publish, subscribe]), {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()},
topic(), oneof([allow, deny])},
begin begin
ClientInfo = inject_magic_into(username, ClientInfo0), ClientInfo = inject_magic_into(username, ClientInfo0),
OutResult = emqx_hooks:run_fold( OutResult = emqx_hooks:run_fold(
@ -132,162 +136,179 @@ prop_client_check_acl() ->
#{result => aclresult_to_bool(Result), #{result => aclresult_to_bool(Result),
type => pubsub_to_enum(PubSub), type => pubsub_to_enum(PubSub),
topic => Topic, topic => Topic,
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_connected() -> prop_client_connected() ->
?ALL({ClientInfo, ConnInfo}, ?ALL({ClientInfo, ConnInfo, Meta},
{clientinfo(), conninfo()}, {clientinfo(), conninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
{'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo) #{clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_disconnected() -> prop_client_disconnected() ->
?ALL({ClientInfo, Reason, ConnInfo}, ?ALL({ClientInfo, Reason, ConnInfo, Meta},
{clientinfo(), shutdown_reason(), conninfo()}, {clientinfo(), shutdown_reason(), conninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
{'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{reason => stringfy(Reason), #{reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_subscribe() -> prop_client_subscribe() ->
?ALL({ClientInfo, SubProps, TopicTab}, ?ALL({ClientInfo, SubProps, TopicTab, Meta},
{clientinfo(), sub_properties(), topictab()}, {clientinfo(), sub_properties(), topictab(), request_meta()},
begin begin
ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
{'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{props => properties(SubProps), #{props => properties(SubProps),
topic_filters => topicfilters(TopicTab), topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_client_unsubscribe() -> prop_client_unsubscribe() ->
?ALL({ClientInfo, UnSubProps, TopicTab}, ?ALL({ClientInfo, UnSubProps, TopicTab, Meta},
{clientinfo(), unsub_properties(), topictab()}, {clientinfo(), unsub_properties(), topictab(), request_meta()},
begin begin
ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
{'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{props => properties(UnSubProps), #{props => properties(UnSubProps),
topic_filters => topicfilters(TopicTab), topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_created() -> prop_session_created() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, ?ALL({ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
{'on_session_created', Resp} = emqx_exhook_demo_svr:take(), {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo) #{clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_subscribed() -> prop_session_subscribed() ->
?ALL({ClientInfo, Topic, SubOpts}, ?ALL({ClientInfo, Topic, SubOpts, Meta},
{clientinfo(), topic(), subopts()}, {clientinfo(), topic(), subopts(), request_meta()},
begin begin
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{topic => Topic, #{topic => Topic,
subopts => subopts(SubOpts), subopts => subopts(SubOpts),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_unsubscribed() -> prop_session_unsubscribed() ->
?ALL({ClientInfo, Topic, SubOpts}, ?ALL({ClientInfo, Topic, SubOpts, Meta},
{clientinfo(), topic(), subopts()}, {clientinfo(), topic(), subopts(), request_meta()},
begin begin
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{topic => Topic, #{topic => Topic,
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_resumed() -> prop_session_resumed() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, ?ALL({ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
{'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo) #{clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_discared() -> prop_session_discared() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, ?ALL({ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
{'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo) #{clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_takeovered() -> prop_session_takeovered() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, ?ALL({ClientInfo, SessInfo, Meta},
{clientinfo(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
{'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo) #{clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_session_terminated() -> prop_session_terminated() ->
?ALL({ClientInfo, Reason, SessInfo}, ?ALL({ClientInfo, Reason, SessInfo, Meta},
{clientinfo(), shutdown_reason(), sessioninfo()}, {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
begin begin
ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
{'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{reason => stringfy(Reason), #{reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo) clientinfo => from_clientinfo(ClientInfo),
meta => Meta
}, },
?assertEqual(Expected, Resp), ?assertEqual(Expected, Resp),
true true
end). end).
prop_message_publish() -> prop_message_publish() ->
?ALL(Msg0, message(), ?ALL({Msg0, Meta},
{message(), request_meta()},
begin begin
Msg = emqx_message:from_map( Msg = emqx_message:from_map(
inject_magic_into(from, emqx_message:to_map(Msg0))), inject_magic_into(from, emqx_message:to_map(Msg0))),
@ -317,7 +338,8 @@ prop_message_publish() ->
{'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{message => from_message(Msg) #{message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -325,7 +347,8 @@ prop_message_publish() ->
end). end).
prop_message_dropped() -> prop_message_dropped() ->
?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, ?ALL({Msg, By, Reason, Meta},
{message(), hardcoded, shutdown_reason(), request_meta()},
begin begin
ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -334,7 +357,8 @@ prop_message_dropped() ->
{'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{reason => stringfy(Reason), #{reason => stringfy(Reason),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -342,7 +366,8 @@ prop_message_dropped() ->
end). end).
prop_message_delivered() -> prop_message_delivered() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()}, ?ALL({ClientInfo, Msg, Meta},
{clientinfo(), message(), request_meta()},
begin begin
ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -351,7 +376,8 @@ prop_message_delivered() ->
{'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo), #{clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -359,7 +385,8 @@ prop_message_delivered() ->
end). end).
prop_message_acked() -> prop_message_acked() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()}, ?ALL({ClientInfo, Msg, Meta},
{clientinfo(), message(), request_meta()},
begin begin
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
@ -368,7 +395,8 @@ prop_message_acked() ->
{'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
Expected = Expected =
#{clientinfo => from_clientinfo(ClientInfo), #{clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg) message => from_message(Msg),
meta => Meta
}, },
?assertEqual(Expected, Resp) ?assertEqual(Expected, Resp)
end, end,
@ -411,6 +439,8 @@ stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term); integer_to_binary(Term);
stringfy(Term) when is_atom(Term) -> stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8); atom_to_binary(Term, utf8);
stringfy(Term) when is_list(Term) ->
list_to_binary(Term);
stringfy(Term) -> stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))). unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
@ -513,6 +543,13 @@ sub_properties() ->
unsub_properties() -> unsub_properties() ->
#{}. #{}.
request_meta() ->
#{ node => nodestr()
, version => stringfy(emqx_sys:version())
, sysdescr => stringfy(emqx_sys:sysdescr())
, cluster_name => ?DEFAULT_CLUSTER_NAME
}.
shutdown_reason() -> shutdown_reason() ->
oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]).

View File

@ -3,6 +3,7 @@
{VSN, {VSN,
[{"4.3.14", [{"4.3.14",
[{load_module,emqx,brutal_purge,soft_purge,[]}, [{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
@ -433,6 +434,7 @@
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.14", [{"4.3.14",
[{load_module,emqx,brutal_purge,soft_purge,[]}, [{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},

View File

@ -29,6 +29,7 @@
]). ]).
-export([ version/0 -export([ version/0
, cluster_name/0
, uptime/0 , uptime/0
, datetime/0 , datetime/0
, sysdescr/0 , sysdescr/0
@ -87,6 +88,10 @@ stop() ->
-spec(version() -> string()). -spec(version() -> string()).
version() -> emqx_app:get_release(). version() -> emqx_app:get_release().
%% @doc Get cluster name
-spec(cluster_name() -> string()).
cluster_name() -> atom_to_list(ekka:cluster_name()).
%% @doc Get sys description %% @doc Get sys description
-spec(sysdescr() -> string()). -spec(sysdescr() -> string()).
sysdescr() -> emqx_app:get_description(). sysdescr() -> emqx_app:get_description().