diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 178f269c9..c84011bf0 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -14,8 +14,10 @@ File format: ### Enhancements +* ExHook add a new field named `meta` to provide emqx `cluster_name`. + The grpc server can handle calls separately for different clusters. [#7524] * In order to fix the execution order of exhook, e.g. before/after other plugins/modules, - ExHook now supports user customizing emqx_hook execute priority. + ExHook now supports user customizing emqx_hook execute priority. [#7408] * add api: PUT /rules/{id}/reset_metrics. This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474] * Enhanced rule engine error handling when json parsing error. diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore index da1f0db23..b01539214 100644 --- a/apps/emqx_exhook/.gitignore +++ b/apps/emqx_exhook/.gitignore @@ -25,5 +25,5 @@ data/ *.class Mnesia.nonode@nohost/ src/emqx_exhook_pb.erl -src/emqx_exhook_v_1_hook_provider_client.erl -src/emqx_exhook_v_1_hook_provider_bhvr.erl +src/emqx_exhook_v_*_hook_provider_client.erl +src/emqx_exhook_v_*_hook_provider_bhvr.erl diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 72ba26581..43666a6ad 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -22,6 +22,7 @@ option java_multiple_files = true; option java_package = "io.emqx.exhook"; option java_outer_classname = "EmqxExHookProto"; +// Proto package compatible, Don't need an updated version.. package emqx.exhook.v1; service HookProvider { @@ -76,6 +77,8 @@ service HookProvider { message ProviderLoadedRequest { BrokerInfo broker = 1; + + RequestMeta meta = 2; } message LoadedResponse { @@ -83,7 +86,11 @@ message LoadedResponse { repeated HookSpec hooks = 1; } -message ProviderUnloadedRequest { } +message ProviderUnloadedRequest { + + RequestMeta meta = 1; +} + message ClientConnectRequest { @@ -93,6 +100,8 @@ message ClientConnectRequest { // // It should be empty on MQTT v3.1.1/v3.1 or others protocol repeated Property props = 2; + + RequestMeta meta = 3; } message ClientConnackRequest { @@ -102,11 +111,15 @@ message ClientConnackRequest { string result_code = 2; repeated Property props = 3; + + RequestMeta meta = 4; } message ClientConnectedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message ClientDisconnectedRequest { @@ -114,6 +127,8 @@ message ClientDisconnectedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message ClientAuthenticateRequest { @@ -121,6 +136,8 @@ message ClientAuthenticateRequest { ClientInfo clientinfo = 1; bool result = 2; + + RequestMeta meta = 3; } message ClientCheckAclRequest { @@ -139,6 +156,8 @@ message ClientCheckAclRequest { string topic = 3; bool result = 4; + + RequestMeta meta = 5; } message ClientSubscribeRequest { @@ -148,6 +167,8 @@ message ClientSubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message ClientUnsubscribeRequest { @@ -157,11 +178,15 @@ message ClientUnsubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message SessionCreatedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionSubscribedRequest { @@ -171,6 +196,8 @@ message SessionSubscribedRequest { string topic = 2; SubOpts subopts = 3; + + RequestMeta meta = 4; } message SessionUnsubscribedRequest { @@ -178,21 +205,29 @@ message SessionUnsubscribedRequest { ClientInfo clientinfo = 1; string topic = 2; + + RequestMeta meta = 3; } message SessionResumedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionDiscardedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTakeoveredRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTerminatedRequest { @@ -200,11 +235,15 @@ message SessionTerminatedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message MessagePublishRequest { Message message = 1; + + RequestMeta meta = 2; } message MessageDeliveredRequest { @@ -212,6 +251,8 @@ message MessageDeliveredRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } message MessageDroppedRequest { @@ -219,6 +260,8 @@ message MessageDroppedRequest { Message message = 1; string reason = 2; + + RequestMeta meta = 3; } message MessageAckedRequest { @@ -226,6 +269,8 @@ message MessageAckedRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } //------------------------------------------------------------------------------ @@ -405,3 +450,14 @@ message SubOpts { // connection with a ClientID equal to the ClientID of the publishing uint32 nl = 5; } + +message RequestMeta { + + string node = 1; + + string version = 2; + + string sysdescr = 3; + + string cluster_name = 4; +} diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index b386bcaca..6075d50db 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,3 +1,4 @@ +%% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, {vsn, "4.3.5"}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index dee9aed5f..8245576ec 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -2,8 +2,11 @@ {VSN, [ {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]}, - {load_module, emqx_exhook_server, brutal_purge,soft_purge,[]}, + {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook, brutal_purge, soft_purge, []}, {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} ]}, {<<"4\\.3\\.[0-3]">>, [ @@ -13,8 +16,11 @@ ], [ {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]}, - {load_module, emqx_exhook_server, brutal_purge,soft_purge,[]}, + {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook, brutal_purge, soft_purge, []}, {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} ]}, {<<"4\\.3\\.[0-3]">>, [ diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index b27710e4a..777cfe27d 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -30,6 +30,11 @@ , call_fold/3 ]). +-export([request_meta/0]). + +-import(emqx_exhook_handler, [stringfy/1]). +%% TODO: move util functions to an independent module + %%-------------------------------------------------------------------- %% Mgmt APIs %%-------------------------------------------------------------------- @@ -116,3 +121,10 @@ deny_action_result('message.publish', Msg) -> %% TODO: Not support to deny a message %% maybe we can put the 'allow_publish' into message header Msg. + +request_meta() -> + #{ node => stringfy(node()) + , version => emqx_sys:version() + , sysdescr => emqx_sys:sysdescr() + , cluster_name => emqx_sys:cluster_name() + }. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index f3964dc42..ec52d5aa9 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -288,6 +288,8 @@ stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); +stringfy(Term) when is_list(Term) -> + list_to_binary(Term); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index d3953ade7..7a6fa9862 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -269,23 +269,24 @@ match_topic_filter(TopicName, TopicFilter) -> -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. do_call(ChannName, Fun, Req, ReqOpts) -> + NReq = Req#{meta => emqx_exhook:request_meta()}, Options = ReqOpts#{channel => ChannName}, - ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), - case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of + ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]), + case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of {ok, Resp, _Metadata} -> ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), {ok, Resp}; {error, {Code, Msg}, _Metadata} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Code, Msg]), {error, {Code, Msg}}; {error, Reason} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Reason]), {error, Reason}; {'EXIT', {Reason, Stk}} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Reason, Stk]), {error, Reason} end. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 5aed2f2b8..101c55749 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -19,9 +19,14 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_exhook.hrl"). + -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster). +-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster"). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -99,29 +104,63 @@ t_cli_stats(_) -> unmeck_print(). t_priority(_) -> - restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]), + restart_apps_with_envs([ {emqx, fun set_special_cfgs/1} + , {emqx_exhook, [{hook_priority, 1}]}]), emqx_exhook:disable(default), ok = emqx_exhook:enable(default), [Callback | _] = emqx_hooks:lookup('client.connected'), - 1 = emqx_hooks:callback_priority(Callback). + ?assertEqual(1, emqx_hooks:callback_priority(Callback)). + +t_cluster_name(_) -> + SetEnvFun = + fun(emqx) -> + set_special_cfgs(emqx), + application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM); + (emqx_exhook) -> + application:set_env(emqx_exhook, hook_priority, 1) + end, + + emqx_ct_helpers:stop_apps([emqx, emqx_exhook]), + emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), + + ?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()), + + emqx_exhook:disable(default), + ok = emqx_exhook:enable(default), + %% See emqx_exhook_demo_svr:on_provider_loaded/2 + ?assertEqual([], emqx_hooks:lookup('session.created')), + ?assertEqual([], emqx_hooks:lookup('message_publish')), + + [Callback | _] = emqx_hooks:lookup('client.connected'), + ?assertEqual(1, emqx_hooks:callback_priority(Callback)). %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- %% TODO: make it more general and move to `emqx_ct_helpers` -restart_exhook_with_envs(Envs) -> - emqx_ct_helpers:stop_apps([emqx_exhook]), - SetPriorityFun - = fun(emqx) -> - set_special_cfgs(emqx); - (emqx_exhook) -> - lists:foreach(fun({App, Key, Val}) -> - application:set_env(App, Key, Val) - end, Envs) - end, - emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun). +restart_app_with_envs(App, Fun) + when is_function(Fun) -> + emqx_ct_helpers:stop_apps([App]), + emqx_ct_helpers:start_apps([App], Fun); + +restart_app_with_envs(App, Envs) + when is_list(Envs) -> + emqx_ct_helpers:stop_apps([App]), + HandlerFun = + fun(App) -> + lists:foreach(fun({Key, Val}) -> + application:set_env(App, Key, Val) + end, Envs) + end, + emqx_ct_helpers:start_apps([App], HandlerFun). + +restart_apps_with_envs([]) -> + ok; +restart_apps_with_envs([{App, Envs} | Rest]) -> + restart_app_with_envs(App, Envs), + restart_apps_with_envs(Rest). meck_print() -> meck:new(emqx_ctl, [passthrough, no_history, no_link]), diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index c2db04dd4..0e041d689 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -51,6 +51,8 @@ -define(PORT, 9000). -define(NAME, ?MODULE). +-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). +-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>). %%-------------------------------------------------------------------- %% Server APIs @@ -112,30 +114,37 @@ reply(Q1, Q2) -> -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. - -on_provider_loaded(Req, Md) -> +on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> ?MODULE:in({?FUNCTION_NAME, Req}), %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{hooks => [ - #{name => <<"client.connect">>}, - #{name => <<"client.connack">>}, - #{name => <<"client.connected">>}, - #{name => <<"client.disconnected">>}, - #{name => <<"client.authenticate">>}, - #{name => <<"client.check_acl">>}, - #{name => <<"client.subscribe">>}, - #{name => <<"client.unsubscribe">>}, - #{name => <<"session.created">>}, - #{name => <<"session.subscribed">>}, - #{name => <<"session.unsubscribed">>}, - #{name => <<"session.resumed">>}, - #{name => <<"session.discarded">>}, - #{name => <<"session.takeovered">>}, - #{name => <<"session.terminated">>}, - #{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, - #{name => <<"message.acked">>}, - #{name => <<"message.dropped">>}]}, Md}. + HooksClient = + [#{name => <<"client.connect">>}, + #{name => <<"client.connack">>}, + #{name => <<"client.connected">>}, + #{name => <<"client.disconnected">>}, + #{name => <<"client.authenticate">>}, + #{name => <<"client.check_acl">>}, + #{name => <<"client.subscribe">>}, + #{name => <<"client.unsubscribe">>}], + HooksSession = + [#{name => <<"session.created">>}, + #{name => <<"session.subscribed">>}, + #{name => <<"session.unsubscribed">>}, + #{name => <<"session.resumed">>}, + #{name => <<"session.discarded">>}, + #{name => <<"session.takeovered">>}, + #{name => <<"session.terminated">>}], + HooksMessage = + [#{name => <<"message.publish">>}, + #{name => <<"message.delivered">>}, + #{name => <<"message.acked">>}, + #{name => <<"message.dropped">>}], + case Name of + ?DEFAULT_CLUSTER_NAME -> + {ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md}; + ?OTHER_CLUSTER_NAME_BIN -> + {ok, #{hooks => HooksClient}, Md} + end. -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 24f45c8b0..2aec580ff 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -36,42 +36,46 @@ fun() -> do_teardown(State) end end, ?FORALL(Vars, Types, Exprs))). +-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). + %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- prop_client_connect() -> - ?ALL({ConnInfo, ConnProps}, - {conninfo(), conn_properties()}, - begin - ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), - {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(ConnProps), - conninfo => from_conninfo(ConnInfo) - }, - ?assertEqual(Expected, Resp), - true - end). + ?ALL({ConnInfo, ConnProps, Meta}, + {conninfo(), conn_properties(), request_meta()}, + begin + ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), + {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(ConnProps), + conninfo => from_conninfo(ConnInfo), + meta => Meta + }, + ?assertEqual(Expected, Resp), + true + end). prop_client_connack() -> - ?ALL({ConnInfo, Rc, AckProps}, - {conninfo(), connack_return_code(), ack_properties()}, + ?ALL({ConnInfo, Rc, AckProps, Meta}, + {conninfo(), connack_return_code(), ack_properties(), request_meta()}, begin ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(AckProps), result_code => atom_to_binary(Rc, utf8), - conninfo => from_conninfo(ConnInfo) + conninfo => from_conninfo(ConnInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_authenticate() -> - ?ALL({ClientInfo0, AuthResult}, - {clientinfo(), authresult()}, + ?ALL({ClientInfo0, AuthResult, Meta}, + {clientinfo(), authresult(), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), @@ -103,16 +107,16 @@ prop_client_authenticate() -> {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), Expected = #{result => authresult_to_bool(AuthResult), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_check_acl() -> - ?ALL({ClientInfo0, PubSub, Topic, Result}, - {clientinfo(), oneof([publish, subscribe]), - topic(), oneof([allow, deny])}, + ?ALL({ClientInfo0, PubSub, Topic, Result, Meta}, + {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutResult = emqx_hooks:run_fold( @@ -132,162 +136,179 @@ prop_client_check_acl() -> #{result => aclresult_to_bool(Result), type => pubsub_to_enum(PubSub), topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_connected() -> - ?ALL({ClientInfo, ConnInfo}, - {clientinfo(), conninfo()}, + ?ALL({ClientInfo, ConnInfo, Meta}, + {clientinfo(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_disconnected() -> - ?ALL({ClientInfo, Reason, ConnInfo}, - {clientinfo(), shutdown_reason(), conninfo()}, + ?ALL({ClientInfo, Reason, ConnInfo, Meta}, + {clientinfo(), shutdown_reason(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_subscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab}, - {clientinfo(), sub_properties(), topictab()}, + ?ALL({ClientInfo, SubProps, TopicTab, Meta}, + {clientinfo(), sub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(SubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_unsubscribe() -> - ?ALL({ClientInfo, UnSubProps, TopicTab}, - {clientinfo(), unsub_properties(), topictab()}, + ?ALL({ClientInfo, UnSubProps, TopicTab, Meta}, + {clientinfo(), unsub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(UnSubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_created() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_subscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + ?ALL({ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), Expected = #{topic => Topic, subopts => subopts(SubOpts), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_unsubscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + ?ALL({ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), Expected = #{topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_resumed() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_discared() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_takeovered() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_terminated() -> - ?ALL({ClientInfo, Reason, SessInfo}, - {clientinfo(), shutdown_reason(), sessioninfo()}, + ?ALL({ClientInfo, Reason, SessInfo, Meta}, + {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_message_publish() -> - ?ALL(Msg0, message(), + ?ALL({Msg0, Meta}, + {message(), request_meta()}, begin Msg = emqx_message:from_map( inject_magic_into(from, emqx_message:to_map(Msg0))), @@ -317,7 +338,8 @@ prop_message_publish() -> {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{message => from_message(Msg) + #{message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -325,7 +347,8 @@ prop_message_publish() -> end). prop_message_dropped() -> - ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, + ?ALL({Msg, By, Reason, Meta}, + {message(), hardcoded, shutdown_reason(), request_meta()}, begin ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -334,7 +357,8 @@ prop_message_dropped() -> {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -342,7 +366,8 @@ prop_message_dropped() -> end). prop_message_delivered() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + ?ALL({ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -351,7 +376,8 @@ prop_message_delivered() -> {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), Expected = #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -359,7 +385,8 @@ prop_message_delivered() -> end). prop_message_acked() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + ?ALL({ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -368,7 +395,8 @@ prop_message_acked() -> {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), Expected = #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -411,6 +439,8 @@ stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); +stringfy(Term) when is_list(Term) -> + list_to_binary(Term); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). @@ -513,6 +543,13 @@ sub_properties() -> unsub_properties() -> #{}. +request_meta() -> + #{ node => nodestr() + , version => stringfy(emqx_sys:version()) + , sysdescr => stringfy(emqx_sys:sysdescr()) + , cluster_name => ?DEFAULT_CLUSTER_NAME + }. + shutdown_reason() -> oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3951c417c..eed1ca911 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ {VSN, [{"4.3.14", [{load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, @@ -433,6 +434,7 @@ {<<".*">>,[]}], [{"4.3.14", [{load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index c61272e64..e62d7043f 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -29,6 +29,7 @@ ]). -export([ version/0 + , cluster_name/0 , uptime/0 , datetime/0 , sysdescr/0 @@ -87,6 +88,10 @@ stop() -> -spec(version() -> string()). version() -> emqx_app:get_release(). +%% @doc Get cluster name +-spec(cluster_name() -> string()). +cluster_name() -> atom_to_list(ekka:cluster_name()). + %% @doc Get sys description -spec(sysdescr() -> string()). sysdescr() -> emqx_app:get_description().