From 78d582447c0e84863fc77c9b653988978d433651 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Apr 2022 20:51:12 +0800 Subject: [PATCH 1/4] feat: emqx core provides api to get cluster name --- apps/emqx/src/emqx_sys.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index ad9b23daa..70ab7e3c5 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -29,6 +29,7 @@ -export([ version/0, + cluster_name/0, uptime/0, datetime/0, sysdescr/0 @@ -101,6 +102,10 @@ stop() -> -spec version() -> string(). version() -> emqx_app:get_release(). +%% @doc Get cluster name +-spec cluster_name() -> string(). +cluster_name() -> atom_to_list(ekka:cluster_name()). + %% @doc Get sys description -spec sysdescr() -> string(). sysdescr() -> emqx_app:get_description(). From 9a1e9c1465e8801595e6194ce0350ad231ef4c81 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Apr 2022 18:05:29 +0800 Subject: [PATCH 2/4] chore(exhook): move auto generated code to specific dir --- apps/emqx_exhook/.gitignore | 4 +--- apps/emqx_exhook/rebar.config | 11 ++++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore index da1f0db23..c2e3ce6ab 100644 --- a/apps/emqx_exhook/.gitignore +++ b/apps/emqx_exhook/.gitignore @@ -24,6 +24,4 @@ data/ .DS_Store *.class Mnesia.nonode@nohost/ -src/emqx_exhook_pb.erl -src/emqx_exhook_v_1_hook_provider_client.erl -src/emqx_exhook_v_1_hook_provider_bhvr.erl +src/pb/* diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index cc9a680c3..235d4be1b 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -13,10 +13,11 @@ {protos, ["priv/protos"]}, {gpb_opts, [ {module_name_prefix, "emqx_"}, - {module_name_suffix, "_pb"} - ]} + {module_name_suffix, "_pb"}, + {o, "src/pb"} + ]}, + {out_dir, "src/pb"} ]}. - {provider_hooks, [ {pre, [ {compile, {grpc, gen}}, @@ -50,8 +51,8 @@ {cover_export_enabled, true}. {cover_excl_mods, [ emqx_exhook_pb, - emqx_exhook_v_1_hook_provider_bhvr, - emqx_exhook_v_1_hook_provider_client + emqx_exhook_v_2_hook_provider_bhvr, + emqx_exhook_v_2_hook_provider_client ]}. {project_plugins, [erlfmt]}. From bbd843c68b5c03a945df4bdad38392ffa643926a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 22 Apr 2022 14:56:33 +0800 Subject: [PATCH 3/4] feat(exhook): provide meta data in `RequestMeta` --- apps/emqx_exhook/docs/design-cn.md | 2 +- apps/emqx_exhook/priv/protos/exhook.proto | 85 +++++++++++++++++--- apps/emqx_exhook/src/emqx_exhook_handler.erl | 11 ++- apps/emqx_exhook/src/emqx_exhook_server.erl | 15 ++-- 4 files changed, 96 insertions(+), 17 deletions(-) diff --git a/apps/emqx_exhook/docs/design-cn.md b/apps/emqx_exhook/docs/design-cn.md index 7ab870cd3..a8c34ac0f 100644 --- a/apps/emqx_exhook/docs/design-cn.md +++ b/apps/emqx_exhook/docs/design-cn.md @@ -50,7 +50,7 @@ ```protobuff syntax = "proto3"; -package emqx.exhook.v1; +package emqx.exhook.v2; service HookProvider { diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 33ceed869..9a6923b0c 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -16,13 +16,15 @@ syntax = "proto3"; -option csharp_namespace = "Emqx.Exhook.V1"; +option csharp_namespace = "Emqx.Exhook.V2"; option go_package = "emqx.io/grpc/exhook"; option java_multiple_files = true; option java_package = "io.emqx.exhook"; option java_outer_classname = "EmqxExHookProto"; -package emqx.exhook.v1; +// The exhook proto version should be fixed as `v2` in EMQX v5.x +// to make sure the exhook proto version is compatible +package emqx.exhook.v2; service HookProvider { @@ -70,21 +72,21 @@ service HookProvider { } //------------------------------------------------------------------------------ -// Request & Response +// Request //------------------------------------------------------------------------------ message ProviderLoadedRequest { BrokerInfo broker = 1; + + RequestMeta meta = 2; } -message LoadedResponse { +message ProviderUnloadedRequest { - repeated HookSpec hooks = 1; + RequestMeta meta = 1; } -message ProviderUnloadedRequest { } - message ClientConnectRequest { ConnInfo conninfo = 1; @@ -93,6 +95,8 @@ message ClientConnectRequest { // // It should be empty on MQTT v3.1.1/v3.1 or others protocol repeated Property props = 2; + + RequestMeta meta = 3; } message ClientConnackRequest { @@ -102,11 +106,15 @@ message ClientConnackRequest { string result_code = 2; repeated Property props = 3; + + RequestMeta meta = 4; } message ClientConnectedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message ClientDisconnectedRequest { @@ -114,6 +122,8 @@ message ClientDisconnectedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message ClientAuthenticateRequest { @@ -121,6 +131,8 @@ message ClientAuthenticateRequest { ClientInfo clientinfo = 1; bool result = 2; + + RequestMeta meta = 3; } message ClientAuthorizeRequest { @@ -139,6 +151,8 @@ message ClientAuthorizeRequest { string topic = 3; bool result = 4; + + RequestMeta meta = 5; } message ClientSubscribeRequest { @@ -148,6 +162,8 @@ message ClientSubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message ClientUnsubscribeRequest { @@ -157,11 +173,15 @@ message ClientUnsubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message SessionCreatedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionSubscribedRequest { @@ -171,6 +191,8 @@ message SessionSubscribedRequest { string topic = 2; SubOpts subopts = 3; + + RequestMeta meta = 4; } message SessionUnsubscribedRequest { @@ -178,21 +200,29 @@ message SessionUnsubscribedRequest { ClientInfo clientinfo = 1; string topic = 2; + + RequestMeta meta = 3; } message SessionResumedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionDiscardedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTakenoverRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTerminatedRequest { @@ -200,11 +230,15 @@ message SessionTerminatedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message MessagePublishRequest { Message message = 1; + + RequestMeta meta = 2; } message MessageDeliveredRequest { @@ -212,6 +246,8 @@ message MessageDeliveredRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } message MessageDroppedRequest { @@ -219,6 +255,8 @@ message MessageDroppedRequest { Message message = 1; string reason = 2; + + RequestMeta meta = 3; } message MessageAckedRequest { @@ -226,13 +264,22 @@ message MessageAckedRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } //------------------------------------------------------------------------------ -// Basic data types +// Response //------------------------------------------------------------------------------ -message EmptySuccess { } +// Responsed by `ProviderLoadedRequest` + +message LoadedResponse { + + repeated HookSpec hooks = 1; +} + +// Responsed by `ClientAuthenticateRequest` `ClientAuthorizeRequest` `MessagePublishRequest` message ValuedResponse { @@ -261,6 +308,14 @@ message ValuedResponse { } } +// no Response by other Requests + +message EmptySuccess { } + +//------------------------------------------------------------------------------ +// Basic data types +//------------------------------------------------------------------------------ + message BrokerInfo { string version = 1; @@ -272,6 +327,7 @@ message BrokerInfo { string datetime = 4; } + message HookSpec { // The registered hooks name @@ -430,3 +486,14 @@ message SubOpts { // connection with a ClientID equal to the ClientID of the publishing uint32 nl = 5; } + +message RequestMeta { + + string node = 1; + + string version = 2; + + string sysdescr = 3; + + string cluster_name = 4; +} diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 0d6f2897d..eace7a52f 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -57,7 +57,8 @@ merge_responsed_bool/2, merge_responsed_message/2, assign_to_message/2, - clientinfo/1 + clientinfo/1, + request_meta/0 ]). -import( @@ -455,3 +456,11 @@ merge_responsed_message(_Req, Resp) -> ret('CONTINUE') -> ok; ret('STOP_AND_RETURN') -> stop. + +request_meta() -> + #{ + node => stringfy(node()), + version => emqx_sys:version(), + sysdescr => emqx_sys:sysdescr(), + cluster_name => emqx_sys:cluster_name() + }. diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 36f5f403a..88aee8e7d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -19,7 +19,9 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). --define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). +%% The exhook proto version should be fixed as `v2` in EMQX v5.x +%% to make sure the exhook proto version is compatible +-define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client). %% Load/Unload -export([ @@ -362,14 +364,15 @@ match_topic_filter(TopicName, TopicFilter) -> -spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}. do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> Options = ReqOpts#{channel => ChannName}, + NReq = Req#{meta => emqx_exhook_handler:request_meta()}, ?SLOG(debug, #{ msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, + req => NReq, options => Options }), - case catch ?CALL_PB_CLIENT(ChanneName, Fun, Req, Options) of + case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of {ok, Resp, Metadata} -> ?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}), update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2), @@ -379,7 +382,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, + req => NReq, options => Options, code => Code, packet => Msg @@ -391,7 +394,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, + req => NReq, options => Options, reason => Reason }), @@ -402,7 +405,7 @@ do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) -> msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun, - req => Req, + req => NReq, options => Options, stacktrace => Stk }), From a7542e16730cd0d98cad9417dce513d6007879e9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sat, 23 Apr 2022 15:47:48 +0800 Subject: [PATCH 4/4] test(exhook): exhook request_meta SUITE and prop tests --- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 45 +++++- .../test/emqx_exhook_api_SUITE.erl | 13 +- .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 69 ++++---- .../test/props/prop_exhook_hooks.erl | 153 +++++++++++------- 4 files changed, 190 insertions(+), 90 deletions(-) diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index add366f76..99bfbf3cb 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -19,8 +19,15 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_exhook.hrl"). + -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). + +-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl). + +-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster). +-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster"). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, << @@ -54,6 +61,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Cfg) -> application:load(emqx_conf), ok = ekka:start(), + application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:expect(emqx_alarm, activate, 3, ok), @@ -65,6 +73,7 @@ init_per_suite(Cfg) -> Cfg. end_per_suite(_Cfg) -> + application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM), ekka:stop(), mria:stop(), mria_mnesia:delete_schema(), @@ -95,7 +104,7 @@ load_cfg(Cfg) -> %% Test cases %%-------------------------------------------------------------------- -t_access_failed_if_no_server_running(_) -> +t_access_failed_if_no_server_running(Config) -> emqx_exhook_mgr:disable(<<"default">>), ClientInfo = #{ clientid => <<"user-id-1">>, @@ -120,7 +129,8 @@ t_access_failed_if_no_server_running(_) -> {stop, Message}, emqx_exhook_handler:on_message_publish(Message) ), - emqx_exhook_mgr:enable(<<"default">>). + emqx_exhook_mgr:enable(<<"default">>), + assert_get_basic_usage_info(Config). t_lookup(_) -> Result = emqx_exhook_mgr:lookup(<<"default">>), @@ -250,7 +260,36 @@ t_misc_test(_) -> _ = emqx_exhook_server:format(#{name => <<"test">>, hookspec => #{}}), ok. -t_get_basic_usage_info(_Config) -> +t_cluster_name(_) -> + SetEnvFun = + fun + (emqx) -> + application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM); + (emqx_exhook) -> + ok + end, + + emqx_common_test_helpers:stop_apps([emqx, emqx_exhook]), + emqx_common_test_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), + + ?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()), + + emqx_exhook_mgr:disable(<<"default">>), + emqx_exhook_mgr:enable(<<"default">>), + %% See emqx_exhook_demo_svr:on_provider_loaded/2 + ?assertEqual([], emqx_hooks:lookup('session.created')), + ?assertEqual([], emqx_hooks:lookup('message_publish')), + ?assertEqual( + true, + erlang:length(emqx_hooks:lookup('client.connected')) > 1 + ), + emqx_exhook_mgr:disable(<<"default">>). + +%%-------------------------------------------------------------------- +%% Cases Helpers +%%-------------------------------------------------------------------- + +assert_get_basic_usage_info(_Config) -> #{ num_servers := NumServers, servers := Servers diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl index 26ad6f8e3..0fe3a8ef9 100644 --- a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -26,6 +26,8 @@ -define(BASE_PATH, "api"). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). +-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl). + -define(CONF_DEFAULT, << "\n" "exhook {\n" @@ -54,18 +56,20 @@ all() -> init_per_suite(Config) -> application:load(emqx_conf), ok = ekka:start(), + application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok), _ = emqx_exhook_demo_svr:start(), - ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, ?CONF_DEFAULT), + load_cfg(?CONF_DEFAULT), emqx_mgmt_api_test_util:init_suite([emqx_exhook]), [Conf] = emqx:get_config([exhook, servers]), [{template, Conf} | Config]. end_per_suite(Config) -> + application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM), ekka:stop(), mria:stop(), mria_mnesia:delete_schema(), @@ -96,6 +100,13 @@ end_per_testcase(_, Config) -> end, Config. +load_cfg(Cfg) -> + ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, Cfg). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + t_list(_) -> {ok, Data} = request_api( get, diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 75d75aaa1..9adf0384e 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -16,7 +16,7 @@ -module(emqx_exhook_demo_svr). --behaviour(emqx_exhook_v_1_hook_provider_bhvr). +-behaviour(emqx_exhook_v_2_hook_provider_bhvr). %% -export([ @@ -55,6 +55,8 @@ -define(PORT, 9000). -define(NAME, ?MODULE). +-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). +-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>). %%-------------------------------------------------------------------- %% Server APIs @@ -89,7 +91,7 @@ mgr_main(Name, Port) -> application:ensure_all_started(grpc), Services = #{ protos => [emqx_exhook_pb], - services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} + services => #{'emqx.exhook.v2.HookProvider' => emqx_exhook_demo_svr} }, Options = [], Svr = grpc:start_server(Name, Port, Services, Options), @@ -134,34 +136,43 @@ to_atom_name(Name) -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. -on_provider_loaded(Req, Md) -> +on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, - #{ - hooks => [ - #{name => <<"client.connect">>}, - #{name => <<"client.connack">>}, - #{name => <<"client.connected">>}, - #{name => <<"client.disconnected">>}, - #{name => <<"client.authenticate">>}, - #{name => <<"client.authorize">>}, - #{name => <<"client.subscribe">>}, - #{name => <<"client.unsubscribe">>}, - #{name => <<"session.created">>}, - #{name => <<"session.subscribed">>}, - #{name => <<"session.unsubscribed">>}, - #{name => <<"session.resumed">>}, - #{name => <<"session.discarded">>}, - #{name => <<"session.takenover">>}, - #{name => <<"session.terminated">>}, - #{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, - #{name => <<"message.acked">>}, - #{name => <<"message.dropped">>} - ] - }, - Md}. + %% io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + HooksClient = + [ + #{name => <<"client.connect">>}, + #{name => <<"client.connack">>}, + #{name => <<"client.connected">>}, + #{name => <<"client.disconnected">>}, + #{name => <<"client.authenticate">>}, + #{name => <<"client.authorize">>}, + #{name => <<"client.subscribe">>}, + #{name => <<"client.unsubscribe">>} + ], + HooksSession = + [ + #{name => <<"session.created">>}, + #{name => <<"session.subscribed">>}, + #{name => <<"session.unsubscribed">>}, + #{name => <<"session.resumed">>}, + #{name => <<"session.discarded">>}, + #{name => <<"session.takenover">>}, + #{name => <<"session.terminated">>} + ], + HooksMessage = + [ + #{name => <<"message.publish">>}, + #{name => <<"message.delivered">>}, + #{name => <<"message.acked">>}, + #{name => <<"message.dropped">>} + ], + case Name of + ?DEFAULT_CLUSTER_NAME -> + {ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md}; + ?OTHER_CLUSTER_NAME_BIN -> + {ok, #{hooks => HooksClient}, Md} + end. -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 9e0660681..5c169f72f 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -53,6 +53,8 @@ ?FORALL(Vars, Types, Exprs) ) ). +-define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl). +-define(DEFAULT_CLUSTER_NAME_BIN, <<"emqxcl">>). %%-------------------------------------------------------------------- %% Properties @@ -60,15 +62,16 @@ prop_client_connect() -> ?ALL( - {ConnInfo, ConnProps}, - {conninfo(), conn_properties()}, + {ConnInfo, ConnProps, Meta}, + {conninfo(), conn_properties(), request_meta()}, begin ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), Expected = #{ props => properties(ConnProps), - conninfo => from_conninfo(ConnInfo) + conninfo => from_conninfo(ConnInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -77,8 +80,8 @@ prop_client_connect() -> prop_client_connack() -> ?ALL( - {ConnInfo, Rc, AckProps}, - {conninfo(), connack_return_code(), ack_properties()}, + {ConnInfo, Rc, AckProps, Meta}, + {conninfo(), connack_return_code(), ack_properties(), request_meta()}, begin ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), @@ -86,7 +89,8 @@ prop_client_connack() -> #{ props => properties(AckProps), result_code => atom_to_binary(Rc, utf8), - conninfo => from_conninfo(ConnInfo) + conninfo => from_conninfo(ConnInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -95,8 +99,8 @@ prop_client_connack() -> prop_client_authenticate() -> ?ALL( - {ClientInfo0, AuthResult}, - {clientinfo(), authresult()}, + {ClientInfo0, AuthResult, Meta}, + {clientinfo(), authresult(), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), @@ -120,7 +124,8 @@ prop_client_authenticate() -> Expected = #{ result => authresult_to_bool(AuthResult), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -129,8 +134,8 @@ prop_client_authenticate() -> prop_client_authorize() -> ?ALL( - {ClientInfo0, PubSub, Topic, Result}, - {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])}, + {ClientInfo0, PubSub, Topic, Result, Meta}, + {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutResult = emqx_hooks:run_fold( @@ -153,7 +158,8 @@ prop_client_authorize() -> result => aclresult_to_bool(Result), type => pubsub_to_enum(PubSub), topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -162,13 +168,16 @@ prop_client_authorize() -> prop_client_connected() -> ?ALL( - {ClientInfo, ConnInfo}, - {clientinfo(), conninfo()}, + {ClientInfo, ConnInfo, Meta}, + {clientinfo(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo)}, + #{ + clientinfo => from_clientinfo(ClientInfo), + meta => Meta + }, ?assertEqual(Expected, Resp), true end @@ -176,15 +185,16 @@ prop_client_connected() -> prop_client_disconnected() -> ?ALL( - {ClientInfo, Reason, ConnInfo}, - {clientinfo(), shutdown_reason(), conninfo()}, + {ClientInfo, Reason, ConnInfo, Meta}, + {clientinfo(), shutdown_reason(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), Expected = #{ reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -193,8 +203,8 @@ prop_client_disconnected() -> prop_client_subscribe() -> ?ALL( - {ClientInfo, SubProps, TopicTab}, - {clientinfo(), sub_properties(), topictab()}, + {ClientInfo, SubProps, TopicTab, Meta}, + {clientinfo(), sub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), @@ -202,7 +212,8 @@ prop_client_subscribe() -> #{ props => properties(SubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -211,8 +222,8 @@ prop_client_subscribe() -> prop_client_unsubscribe() -> ?ALL( - {ClientInfo, UnSubProps, TopicTab}, - {clientinfo(), unsub_properties(), topictab()}, + {ClientInfo, UnSubProps, TopicTab, Meta}, + {clientinfo(), unsub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), @@ -220,7 +231,8 @@ prop_client_unsubscribe() -> #{ props => properties(UnSubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -229,13 +241,16 @@ prop_client_unsubscribe() -> prop_session_created() -> ?ALL( - {ClientInfo, SessInfo}, - {clientinfo(), sessioninfo()}, + {ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo)}, + #{ + clientinfo => from_clientinfo(ClientInfo), + meta => Meta + }, ?assertEqual(Expected, Resp), true end @@ -243,8 +258,8 @@ prop_session_created() -> prop_session_subscribed() -> ?ALL( - {ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + {ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), @@ -252,7 +267,8 @@ prop_session_subscribed() -> #{ topic => Topic, subopts => subopts(SubOpts), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -261,15 +277,16 @@ prop_session_subscribed() -> prop_session_unsubscribed() -> ?ALL( - {ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + {ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), Expected = #{ topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -278,13 +295,16 @@ prop_session_unsubscribed() -> prop_session_resumed() -> ?ALL( - {ClientInfo, SessInfo}, - {clientinfo(), sessioninfo()}, + {ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo)}, + #{ + clientinfo => from_clientinfo(ClientInfo), + meta => Meta + }, ?assertEqual(Expected, Resp), true end @@ -292,13 +312,13 @@ prop_session_resumed() -> prop_session_discared() -> ?ALL( - {ClientInfo, SessInfo}, - {clientinfo(), sessioninfo()}, + {ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo)}, + #{clientinfo => from_clientinfo(ClientInfo), meta => Meta}, ?assertEqual(Expected, Resp), true end @@ -306,13 +326,13 @@ prop_session_discared() -> prop_session_takenover() -> ?ALL( - {ClientInfo, SessInfo}, - {clientinfo(), sessioninfo()}, + {ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]), {'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo)}, + #{clientinfo => from_clientinfo(ClientInfo), meta => Meta}, ?assertEqual(Expected, Resp), true end @@ -320,15 +340,16 @@ prop_session_takenover() -> prop_session_terminated() -> ?ALL( - {ClientInfo, Reason, SessInfo}, - {clientinfo(), shutdown_reason(), sessioninfo()}, + {ClientInfo, Reason, SessInfo, Meta}, + {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), Expected = #{ reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true @@ -337,8 +358,8 @@ prop_session_terminated() -> prop_message_publish() -> ?ALL( - Msg0, - message(), + {Msg0, Meta}, + {message(), request_meta()}, begin Msg = emqx_message:from_map( inject_magic_into(from, emqx_message:to_map(Msg0)) @@ -381,7 +402,10 @@ prop_message_publish() -> {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{message => from_message(Msg)}, + #{ + message => from_message(Msg), + meta => Meta + }, ?assertEqual(Expected, Resp) end, true @@ -390,8 +414,8 @@ prop_message_publish() -> prop_message_dropped() -> ?ALL( - {Msg, By, Reason}, - {message(), hardcoded, shutdown_reason()}, + {Msg, By, Reason, Meta}, + {message(), hardcoded, shutdown_reason(), request_meta()}, begin ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -402,7 +426,8 @@ prop_message_dropped() -> Expected = #{ reason => stringfy(Reason), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -412,8 +437,8 @@ prop_message_dropped() -> prop_message_delivered() -> ?ALL( - {ClientInfo, Msg}, - {clientinfo(), message()}, + {ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -424,7 +449,8 @@ prop_message_delivered() -> Expected = #{ clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -434,8 +460,8 @@ prop_message_delivered() -> prop_message_acked() -> ?ALL( - {ClientInfo, Msg}, - {clientinfo(), message()}, + {ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -446,7 +472,8 @@ prop_message_acked() -> Expected = #{ clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -500,6 +527,8 @@ stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); +stringfy(Term) when is_list(Term) -> + list_to_binary(Term); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). @@ -569,6 +598,8 @@ from_message(Msg) -> do_setup() -> logger:set_primary_config(#{level => warning}), + ok = ekka:start(), + application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM), _ = emqx_exhook_demo_svr:start(), ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), emqx_common_test_helpers:start_apps([emqx_exhook]), @@ -623,3 +654,11 @@ inject_magic_into(Key, Object) -> castspell() -> L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles], lists:nth(rand:uniform(length(L)), L). + +request_meta() -> + #{ + node => nodestr(), + version => stringfy(emqx_sys:version()), + sysdescr => stringfy(emqx_sys:sysdescr()), + cluster_name => ?DEFAULT_CLUSTER_NAME_BIN + }.