diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 81f91450e..ca8bd13c1 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -14,14 +14,18 @@ File format: ### Enhancements -* In order to fix the execution order of exhook, e.g. before/after other plugins/modules, - ExHook now supports user customizing emqx_hook execute priority. +* Add `RequestMeta` for exhook.proto in order to expose `cluster_name` of emqx in each gRPC request. [#7524] +* Support customize emqx_exhook execution priority. [#7408] * add api: PUT /rules/{id}/reset_metrics. This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474] * Enhanced rule engine error handling when json parsing error. * Add support for `RSA-PSK-AES256-GCM-SHA384`, `RSA-PSK-AES256-CBC-SHA384`, `RSA-PSK-AES128-GCM-SHA256`, `RSA-PSK-AES128-CBC-SHA256` PSK ciphers, and remove `PSK-3DES-EDE-CBC-SHA`, `PSK-RC4-SHA` from the default configuration. [#7427] +* Diagnostic logging for mnesia `wait_for_table` + - prints check points of mnesia internal stats + - prints check points of per table loading stats + Help to locate the problem of long table loading time. ### Bug fixes diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src index d899f35c4..857c9d146 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mongo, [{description, "EMQ X Authentication/ACL with MongoDB"}, - {vsn, "4.4.2"}, % strict semver, bump manually! + {vsn, "4.4.3"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_mongo_sup]}, {applications, [kernel,stdlib,mongodb,ecpool]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src index 641ce3e93..79578d334 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src @@ -1,8 +1,12 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.1", - [{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, + [{"4.4.2", + [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {"4.4.1", + [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {"4.4.0", [{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, @@ -10,8 +14,12 @@ {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.1", - [{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, + [{"4.4.2", + [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {"4.4.1", + [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {"4.4.0", [{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index 307aa3f7f..df70b6dbe 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -35,6 +35,10 @@ , query_multi/3 ]). +-export([ available/2 + , available/3 + ]). + -spec(register_metrics() -> ok). register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, ?AUTH_METRICS). @@ -97,6 +101,56 @@ is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Sele end end. +%%-------------------------------------------------------------------- +%% Availability Test +%%-------------------------------------------------------------------- + +available(Pool, #superquery{collection = Collection, selector = Selector}) -> + available(Pool, Collection, maps:from_list(replvars(Selector, test_client_info()))); +available(Pool, #authquery{collection = Collection, selector = Selector}) -> + available(Pool, Collection, maps:from_list(replvars(Selector, test_client_info()))); +available(Pool, #aclquery{collection = Collection, selector = Selectors}) -> + Fun = + fun(Selector) -> + maps:from_list(emqx_auth_mongo:replvars(Selector, test_client_info())) + end, + available(Pool, Collection, lists:map(Fun, Selectors), fun query_multi/3). + +available(Pool, Collection, Query) -> + available(Pool, Collection, Query, fun query/3). + +available(Pool, Collection, Query, Fun) -> + try Fun(Pool, Collection, Query) of + {error, Reason} -> + ?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]), + {error, Reason}; + Error = #{<<"code">> := Code} -> + CodeName = maps:get(<<"codeName">>, Error, undefined), + ErrorMessage = maps:get(<<"errmsg">>, Error, undefined), + ?LOG(error, "[MongoDB] ~p availability test error, code: ~p Name: ~0p Message: ~0p", + [Collection, Code, CodeName, ErrorMessage]), + {error, {mongo_error, Code}}; + _Return -> + %% Any success result is fine. + ok + catch E:R:S -> + ?LOG(error, "[MongoDB] ~p availability test error, ~p: ~0p: ~0p", [Collection, E, R, S]), + {error, R} + end. + +%% Test client info +test_client_info() -> + #{ + clientid => <<"EMQX_availability_test_client">>, + username => <<"EMQX_availability_test_username">>, + cn => <<"EMQX_availability_test_cn">>, + dn => <<"EMQX_availability_test_dn">> + }. + +%%-------------------------------------------------------------------- +%% Internal func +%%-------------------------------------------------------------------- + replvars(VarList, ClientInfo) -> lists:map(fun(Var) -> replvar(Var, ClientInfo) end, VarList). diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl index 3e05e3411..8c3d1f968 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo_app.erl @@ -36,26 +36,65 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_auth_mongo_sup:start_link(), - with_env(auth_query, fun reg_authmod/1), - with_env(acl_query, fun reg_aclmod/1), + ok = safe_start(), {ok, Sup}. prep_stop(State) -> - ok = emqx:unhook('client.authenticate', fun emqx_auth_mongo:check/3), - ok = emqx:unhook('client.check_acl', fun emqx_acl_mongo:check_acl/5), + ok = unload_hook(), + _ = stop_pool(), State. stop(_State) -> ok. +unload_hook() -> + ok = emqx:unhook('client.authenticate', fun emqx_auth_mongo:check/3), + ok = emqx:unhook('client.check_acl', fun emqx_acl_mongo:check_acl/5). + +stop_pool() -> + ecpool:stop_sup_pool(?APP). + +safe_start() -> + try + ok = with_env(auth_query, fun reg_authmod/1), + ok = with_env(acl_query, fun reg_aclmod/1), + ok + catch _E:R:_S -> + unload_hook(), + _ = stop_pool(), + {error, R} + end. + reg_authmod(AuthQuery) -> - emqx_auth_mongo:register_metrics(), - SuperQuery = r(super_query, application:get_env(?APP, super_query, undefined)), - ok = emqx:hook('client.authenticate', fun emqx_auth_mongo:check/3, - [#{authquery => AuthQuery, superquery => SuperQuery, pool => ?APP}]). + case emqx_auth_mongo:available(?APP, AuthQuery) of + ok -> + emqx_auth_mongo:register_metrics(), + HookFun = fun emqx_auth_mongo:check/3, + HookOptions = #{authquery => AuthQuery, superquery => undefined, pool => ?APP}, + case r(super_query, application:get_env(?APP, super_query, undefined)) of + undefined -> + ok = emqx:hook('client.authenticate', HookFun, [HookOptions]); + SuperQuery -> + case emqx_auth_mongo:available(?APP, SuperQuery) of + ok -> + ok = emqx:hook('client.authenticate', HookFun, + [HookOptions#{superquery => SuperQuery}]); + {error, Reason} -> + {error, Reason} + end + end; + {error, Reason} -> + {error, Reason} + end. reg_aclmod(AclQuery) -> - ok = emqx:hook('client.check_acl', fun emqx_acl_mongo:check_acl/5, [#{aclquery => AclQuery, pool => ?APP}]). + case emqx_auth_mongo:available(?APP, AclQuery) of + ok -> + ok = emqx:hook('client.check_acl', fun emqx_acl_mongo:check_acl/5, + [#{aclquery => AclQuery, pool => ?APP}]); + {error, Reason} -> + {error, Reason} + end. %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl index 5c6061819..bf104e60d 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl @@ -112,8 +112,8 @@ t_rpc(Config) when is_list(Config) -> ClientId = <<"ClientId">>, try {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _Props} = emqtt:connect(ConnPid), - {ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}), + {ok, _} = emqtt:connect(ConnPid), + {ok, _, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}), timer:sleep(100), {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1), timer:sleep(100), @@ -151,14 +151,14 @@ t_mqtt(Config) when is_list(Config) -> ssl => false, %% Consume back to forwarded message for verification %% NOTE: this is a indefenite loopback without mocking emqx_bridge_worker:import_batch/1 - subscriptions => [{SendToTopic2, _QoS = 1}], + subscriptions => [{SendToTopic2, ?QOS_1}], receive_mountpoint => <<"receive/aws/">>, start_type => auto}, {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"client-1">>, try ?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Pid)), - ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, _QoS = 1), + ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, ?QOS_1), ?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Pid)), {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), diff --git a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl index c018b9165..181d11301 100644 --- a/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl @@ -54,7 +54,7 @@ t_update_max_age(_Config) -> ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), - TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), + TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), @@ -65,7 +65,7 @@ t_update_max_age(_Config) -> Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}), {ok,created, #coap_content{location_path = LocPath}} = Reply1, ?assertEqual([<<"/ps/topic1">>] ,LocPath), - [{TopicInPayload, MaxAge2, CT2, _ResPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), + [{TopicInPayload, MaxAge2, CT2, _ResPayload2, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?assertEqual(70, MaxAge2), ?assertEqual(<<"50">>, CT2), @@ -82,7 +82,7 @@ t_create_subtopic(_Config) -> ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), - TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), + TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?assertEqual(60, MaxAge1), ?assertEqual(<<"42">>, CT1), @@ -99,7 +99,7 @@ t_create_subtopic(_Config) -> ?LOGT("Reply =~p", [Reply1]), {ok,created, #coap_content{location_path = LocPath1}} = Reply1, ?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1), - [{FullTopic, MaxAge2, CT2, _ResPayload, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), + [{FullTopic, MaxAge2, CT2, _ResPayload2, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic), ?assertEqual(60, MaxAge2), ?assertEqual(<<"42">>, CT2), @@ -132,7 +132,7 @@ t_refreash_max_age(_Config) -> ?LOGT("Reply =~p", [Reply]), {ok,created, #coap_content{location_path = LocPath}} = Reply, ?assertEqual([<<"/ps/topic1">>] ,LocPath), - TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), + TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("lookup topic info=~p", [TopicInfo]), ?LOGT("TimeStamp=~p", [TimeStamp]), ?assertEqual(5, MaxAge1), @@ -144,7 +144,7 @@ t_refreash_max_age(_Config) -> Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}), {ok,created, #coap_content{location_path = LocPath}} = Reply1, ?assertEqual([<<"/ps/topic1">>] ,LocPath), - [{TopicInPayload, MaxAge2, CT2, _ResPayload, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), + [{TopicInPayload, MaxAge2, CT2, _ResPayload2, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload), ?LOGT("TimeStamp1=~p", [TimeStamp1]), ?assertEqual(5, MaxAge2), ?assertEqual(<<"50">>, CT2), diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore index da1f0db23..b01539214 100644 --- a/apps/emqx_exhook/.gitignore +++ b/apps/emqx_exhook/.gitignore @@ -25,5 +25,5 @@ data/ *.class Mnesia.nonode@nohost/ src/emqx_exhook_pb.erl -src/emqx_exhook_v_1_hook_provider_client.erl -src/emqx_exhook_v_1_hook_provider_bhvr.erl +src/emqx_exhook_v_*_hook_provider_client.erl +src/emqx_exhook_v_*_hook_provider_bhvr.erl diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 639066c6a..cca8f06be 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -22,6 +22,7 @@ option java_multiple_files = true; option java_package = "io.emqx.exhook"; option java_outer_classname = "EmqxExHookProto"; +// Proto package compatible, Don't need an updated version.. package emqx.exhook.v1; service HookProvider { @@ -76,6 +77,8 @@ service HookProvider { message ProviderLoadedRequest { BrokerInfo broker = 1; + + RequestMeta meta = 2; } message LoadedResponse { @@ -83,7 +86,11 @@ message LoadedResponse { repeated HookSpec hooks = 1; } -message ProviderUnloadedRequest { } +message ProviderUnloadedRequest { + + RequestMeta meta = 1; +} + message ClientConnectRequest { @@ -93,6 +100,8 @@ message ClientConnectRequest { // // It should be empty on MQTT v3.1.1/v3.1 or others protocol repeated Property props = 2; + + RequestMeta meta = 3; } message ClientConnackRequest { @@ -102,11 +111,15 @@ message ClientConnackRequest { string result_code = 2; repeated Property props = 3; + + RequestMeta meta = 4; } message ClientConnectedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message ClientDisconnectedRequest { @@ -114,6 +127,8 @@ message ClientDisconnectedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message ClientAuthenticateRequest { @@ -121,6 +136,8 @@ message ClientAuthenticateRequest { ClientInfo clientinfo = 1; bool result = 2; + + RequestMeta meta = 3; } message ClientCheckAclRequest { @@ -139,6 +156,8 @@ message ClientCheckAclRequest { string topic = 3; bool result = 4; + + RequestMeta meta = 5; } message ClientSubscribeRequest { @@ -148,6 +167,8 @@ message ClientSubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message ClientUnsubscribeRequest { @@ -157,11 +178,15 @@ message ClientUnsubscribeRequest { repeated Property props = 2; repeated TopicFilter topic_filters = 3; + + RequestMeta meta = 4; } message SessionCreatedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionSubscribedRequest { @@ -171,6 +196,8 @@ message SessionSubscribedRequest { string topic = 2; SubOpts subopts = 3; + + RequestMeta meta = 4; } message SessionUnsubscribedRequest { @@ -178,21 +205,29 @@ message SessionUnsubscribedRequest { ClientInfo clientinfo = 1; string topic = 2; + + RequestMeta meta = 3; } message SessionResumedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionDiscardedRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTakeoveredRequest { ClientInfo clientinfo = 1; + + RequestMeta meta = 2; } message SessionTerminatedRequest { @@ -200,11 +235,15 @@ message SessionTerminatedRequest { ClientInfo clientinfo = 1; string reason = 2; + + RequestMeta meta = 3; } message MessagePublishRequest { Message message = 1; + + RequestMeta meta = 2; } message MessageDeliveredRequest { @@ -212,6 +251,8 @@ message MessageDeliveredRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } message MessageDroppedRequest { @@ -219,6 +260,8 @@ message MessageDroppedRequest { Message message = 1; string reason = 2; + + RequestMeta meta = 3; } message MessageAckedRequest { @@ -226,6 +269,8 @@ message MessageAckedRequest { ClientInfo clientinfo = 1; Message message = 2; + + RequestMeta meta = 3; } //------------------------------------------------------------------------------ @@ -430,3 +475,14 @@ message SubOpts { // connection with a ClientID equal to the ClientID of the publishing uint32 nl = 5; } + +message RequestMeta { + + string node = 1; + + string version = 2; + + string sysdescr = 3; + + string cluster_name = 4; +} diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 500f0ef05..2004230be 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,3 +1,4 @@ +%% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, {vsn, "4.4.1"}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index 05590198e..ad45d7426 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -2,12 +2,16 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.0", - [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, {update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]}, {<<".*">>,[]}], [{"4.4.0", - [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, {update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index b27710e4a..777cfe27d 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -30,6 +30,11 @@ , call_fold/3 ]). +-export([request_meta/0]). + +-import(emqx_exhook_handler, [stringfy/1]). +%% TODO: move util functions to an independent module + %%-------------------------------------------------------------------- %% Mgmt APIs %%-------------------------------------------------------------------- @@ -116,3 +121,10 @@ deny_action_result('message.publish', Msg) -> %% TODO: Not support to deny a message %% maybe we can put the 'allow_publish' into message header Msg. + +request_meta() -> + #{ node => stringfy(node()) + , version => emqx_sys:version() + , sysdescr => emqx_sys:sysdescr() + , cluster_name => emqx_sys:cluster_name() + }. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 63d41d8eb..fc78d54cc 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -332,6 +332,8 @@ stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); +stringfy(Term) when is_list(Term) -> + list_to_binary(Term); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 0f6d7b87d..cf2c95c8c 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -276,23 +276,24 @@ match_topic_filter(TopicName, TopicFilter) -> -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. do_call(ChannName, Fun, Req, ReqOpts) -> + NReq = Req#{meta => emqx_exhook:request_meta()}, Options = ReqOpts#{channel => ChannName}, - ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), - case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of + ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]), + case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of {ok, Resp, Metadata} -> ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]), {ok, Resp}; {error, {Code, Msg}, _Metadata} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Code, Msg]), {error, {Code, Msg}}; {error, Reason} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Reason]), {error, Reason}; {'EXIT', {Reason, Stk}} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]), + [?PB_CLIENT_MOD, Fun, NReq, Options, Reason, Stk]), {error, Reason} end. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 5aed2f2b8..fc5753293 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -19,9 +19,14 @@ -compile(export_all). -compile(nowarn_export_all). +-include("emqx_exhook.hrl"). + -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster). +-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster"). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -99,29 +104,63 @@ t_cli_stats(_) -> unmeck_print(). t_priority(_) -> - restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]), + restart_apps_with_envs([ {emqx, fun set_special_cfgs/1} + , {emqx_exhook, [{hook_priority, 1}]}]), emqx_exhook:disable(default), ok = emqx_exhook:enable(default), [Callback | _] = emqx_hooks:lookup('client.connected'), - 1 = emqx_hooks:callback_priority(Callback). + ?assertEqual(1, emqx_hooks:callback_priority(Callback)). + +t_cluster_name(_) -> + SetEnvFun = + fun(emqx) -> + set_special_cfgs(emqx), + application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM); + (emqx_exhook) -> + application:set_env(emqx_exhook, hook_priority, 1) + end, + + emqx_ct_helpers:stop_apps([emqx, emqx_exhook]), + emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), + + ?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()), + + emqx_exhook:disable(default), + ok = emqx_exhook:enable(default), + %% See emqx_exhook_demo_svr:on_provider_loaded/2 + ?assertEqual([], emqx_hooks:lookup('session.created')), + ?assertEqual([], emqx_hooks:lookup('message_publish')), + + [Callback | _] = emqx_hooks:lookup('client.connected'), + ?assertEqual(1, emqx_hooks:callback_priority(Callback)). %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- %% TODO: make it more general and move to `emqx_ct_helpers` -restart_exhook_with_envs(Envs) -> - emqx_ct_helpers:stop_apps([emqx_exhook]), - SetPriorityFun - = fun(emqx) -> - set_special_cfgs(emqx); - (emqx_exhook) -> - lists:foreach(fun({App, Key, Val}) -> - application:set_env(App, Key, Val) - end, Envs) - end, - emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun). +restart_app_with_envs(App, Fun) + when is_function(Fun) -> + emqx_ct_helpers:stop_apps([App]), + emqx_ct_helpers:start_apps([App], Fun); + +restart_app_with_envs(App, Envs) + when is_list(Envs) -> + emqx_ct_helpers:stop_apps([App]), + HandlerFun = + fun(AppName) -> + lists:foreach(fun({Key, Val}) -> + application:set_env(AppName, Key, Val) + end, Envs) + end, + emqx_ct_helpers:start_apps([App], HandlerFun). + +restart_apps_with_envs([]) -> + ok; +restart_apps_with_envs([{App, Envs} | Rest]) -> + restart_app_with_envs(App, Envs), + restart_apps_with_envs(Rest). meck_print() -> meck:new(emqx_ctl, [passthrough, no_history, no_link]), diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index b05748856..357722986 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -51,6 +51,8 @@ -define(PORT, 9000). -define(NAME, ?MODULE). +-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). +-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>). %%-------------------------------------------------------------------- %% Server APIs @@ -112,30 +114,37 @@ reply(Q1, Q2) -> -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. - -on_provider_loaded(Req, Md) -> +on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> ?MODULE:in({?FUNCTION_NAME, Req}), %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{hooks => [ - #{name => <<"client.connect">>}, - #{name => <<"client.connack">>}, - #{name => <<"client.connected">>}, - #{name => <<"client.disconnected">>}, - #{name => <<"client.authenticate">>}, - #{name => <<"client.check_acl">>}, - #{name => <<"client.subscribe">>}, - #{name => <<"client.unsubscribe">>}, - #{name => <<"session.created">>}, - #{name => <<"session.subscribed">>}, - #{name => <<"session.unsubscribed">>}, - #{name => <<"session.resumed">>}, - #{name => <<"session.discarded">>}, - #{name => <<"session.takeovered">>}, - #{name => <<"session.terminated">>}, - #{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, - #{name => <<"message.acked">>}, - #{name => <<"message.dropped">>}]}, Md}. + HooksClient = + [#{name => <<"client.connect">>}, + #{name => <<"client.connack">>}, + #{name => <<"client.connected">>}, + #{name => <<"client.disconnected">>}, + #{name => <<"client.authenticate">>}, + #{name => <<"client.check_acl">>}, + #{name => <<"client.subscribe">>}, + #{name => <<"client.unsubscribe">>}], + HooksSession = + [#{name => <<"session.created">>}, + #{name => <<"session.subscribed">>}, + #{name => <<"session.unsubscribed">>}, + #{name => <<"session.resumed">>}, + #{name => <<"session.discarded">>}, + #{name => <<"session.takeovered">>}, + #{name => <<"session.terminated">>}], + HooksMessage = + [#{name => <<"message.publish">>}, + #{name => <<"message.delivered">>}, + #{name => <<"message.acked">>}, + #{name => <<"message.dropped">>}], + case Name of + ?DEFAULT_CLUSTER_NAME -> + {ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md}; + ?OTHER_CLUSTER_NAME_BIN -> + {ok, #{hooks => HooksClient}, Md} + end. -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 88eba8f11..f0a540cdd 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -36,42 +36,46 @@ fun() -> do_teardown(State) end end, ?FORALL(Vars, Types, Exprs))). +-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). + %%-------------------------------------------------------------------- %% Properties %%-------------------------------------------------------------------- prop_client_connect() -> - ?ALL({ConnInfo, ConnProps}, - {conninfo(), conn_properties()}, - begin - ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), - {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(ConnProps), - conninfo => from_conninfo(ConnInfo) - }, - ?assertEqual(Expected, Resp), - true - end). + ?ALL({ConnInfo, ConnProps, Meta}, + {conninfo(), conn_properties(), request_meta()}, + begin + ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), + {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(ConnProps), + conninfo => from_conninfo(ConnInfo), + meta => Meta + }, + ?assertEqual(Expected, Resp), + true + end). prop_client_connack() -> - ?ALL({ConnInfo, Rc, AckProps}, - {conninfo(), connack_return_code(), ack_properties()}, + ?ALL({ConnInfo, Rc, AckProps, Meta}, + {conninfo(), connack_return_code(), ack_properties(), request_meta()}, begin ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(AckProps), result_code => atom_to_binary(Rc, utf8), - conninfo => from_conninfo(ConnInfo) + conninfo => from_conninfo(ConnInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_authenticate() -> - ?ALL({ClientInfo0, AuthResult}, - {clientinfo(), authresult()}, + ?ALL({ClientInfo0, AuthResult, Meta}, + {clientinfo(), authresult(), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), @@ -103,16 +107,16 @@ prop_client_authenticate() -> {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), Expected = #{result => authresult_to_bool(AuthResult), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_check_acl() -> - ?ALL({ClientInfo0, PubSub, Topic, Result}, - {clientinfo(), oneof([publish, subscribe]), - topic(), oneof([allow, deny])}, + ?ALL({ClientInfo0, PubSub, Topic, Result, Meta}, + {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()}, begin ClientInfo = inject_magic_into(username, ClientInfo0), OutResult = emqx_hooks:run_fold( @@ -132,162 +136,179 @@ prop_client_check_acl() -> #{result => aclresult_to_bool(Result), type => pubsub_to_enum(PubSub), topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_connected() -> - ?ALL({ClientInfo, ConnInfo}, - {clientinfo(), conninfo()}, + ?ALL({ClientInfo, ConnInfo, Meta}, + {clientinfo(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_disconnected() -> - ?ALL({ClientInfo, Reason, ConnInfo}, - {clientinfo(), shutdown_reason(), conninfo()}, + ?ALL({ClientInfo, Reason, ConnInfo, Meta}, + {clientinfo(), shutdown_reason(), conninfo(), request_meta()}, begin ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_subscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab}, - {clientinfo(), sub_properties(), topictab()}, + ?ALL({ClientInfo, SubProps, TopicTab, Meta}, + {clientinfo(), sub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(SubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_client_unsubscribe() -> - ?ALL({ClientInfo, UnSubProps, TopicTab}, - {clientinfo(), unsub_properties(), topictab()}, + ?ALL({ClientInfo, UnSubProps, TopicTab, Meta}, + {clientinfo(), unsub_properties(), topictab(), request_meta()}, begin ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), Expected = #{props => properties(UnSubProps), topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_created() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_subscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + ?ALL({ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), Expected = #{topic => Topic, subopts => subopts(SubOpts), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_unsubscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, + ?ALL({ClientInfo, Topic, SubOpts, Meta}, + {clientinfo(), topic(), subopts(), request_meta()}, begin ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), Expected = #{topic => Topic, - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_resumed() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_discared() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_takeovered() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + ?ALL({ClientInfo, SessInfo, Meta}, + {clientinfo(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{clientinfo => from_clientinfo(ClientInfo) + #{clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_session_terminated() -> - ?ALL({ClientInfo, Reason, SessInfo}, - {clientinfo(), shutdown_reason(), sessioninfo()}, + ?ALL({ClientInfo, Reason, SessInfo, Meta}, + {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()}, begin ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) + clientinfo => from_clientinfo(ClientInfo), + meta => Meta }, ?assertEqual(Expected, Resp), true end). prop_message_publish() -> - ?ALL(Msg0, message(), + ?ALL({Msg0, Meta}, + {message(), request_meta()}, begin Msg = emqx_message:from_map( inject_magic_into(from, emqx_message:to_map(Msg0))), @@ -322,7 +343,8 @@ prop_message_publish() -> {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), Expected = - #{message => from_message(Msg) + #{message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -330,7 +352,8 @@ prop_message_publish() -> end). prop_message_dropped() -> - ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, + ?ALL({Msg, By, Reason, Meta}, + {message(), hardcoded, shutdown_reason(), request_meta()}, begin ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -339,7 +362,8 @@ prop_message_dropped() -> {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), Expected = #{reason => stringfy(Reason), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -347,7 +371,8 @@ prop_message_dropped() -> end). prop_message_delivered() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + ?ALL({ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -356,7 +381,8 @@ prop_message_delivered() -> {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), Expected = #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -364,7 +390,8 @@ prop_message_delivered() -> end). prop_message_acked() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + ?ALL({ClientInfo, Msg, Meta}, + {clientinfo(), message(), request_meta()}, begin ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of @@ -373,7 +400,8 @@ prop_message_acked() -> {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), Expected = #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) + message => from_message(Msg), + meta => Meta }, ?assertEqual(Expected, Resp) end, @@ -416,6 +444,8 @@ stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); +stringfy(Term) when is_list(Term) -> + list_to_binary(Term); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). @@ -520,6 +550,13 @@ sub_properties() -> unsub_properties() -> #{}. +request_meta() -> + #{ node => nodestr() + , version => stringfy(emqx_sys:version()) + , sysdescr => stringfy(emqx_sys:sysdescr()) + , cluster_name => ?DEFAULT_CLUSTER_NAME + }. + shutdown_reason() -> oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl index 2e60e9ac4..3cce1bcbe 100644 --- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl @@ -30,10 +30,10 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_retainer]). -init_per_testcase(TestCase, Config) -> +init_per_testcase(_TestCase, Config) -> Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(_TestCase, _Config) -> emqx_retainer:clean(<<"#">>). t_cmd(_) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 2c01ba70d..360f85c34 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.2", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, @@ -11,7 +12,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -21,7 +23,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -32,7 +35,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.2", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, @@ -41,7 +45,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.1", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -51,7 +56,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 068edc571..292e10311 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -297,6 +297,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> + delete_resource(Id), _ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(#resource{ id = Id, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index a6220ee9f..6a50c553d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -323,13 +323,13 @@ show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> Status = - [begin - St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of - {ok, St0} -> St0; - {error, _} -> #{is_alive => false} - end, - maps:put(node, Node, St) - end || Node <- ekka_mnesia:running_nodes()], + lists:concat( + [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {badrpc, _} -> []; + {ok, St} -> [maps:put(node, Node, St)]; + {error, _} -> [maps:put(node, Node, #{is_alive => false})] + end + || Node <- ekka_mnesia:running_nodes()]), return({ok, maps:put(status, Status, record_to_map(R))}); not_found -> return({error, 404, <<"Not Found">>}) @@ -575,9 +575,17 @@ sort_by(Pos, TplList) -> end, TplList). get_rule_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + lists:concat( + [ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). get_action_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. + lists:concat( + [ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index bcb869aec..1284184ca 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -273,10 +273,13 @@ format(#resource{id = Id, config = Config, description = Descr}) -> Status = - [begin - {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), - maps:put(node, Node, St) - end || Node <- [node()| nodes()]], + lists:concat( + [ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {badrpc, _} -> []; + {ok, St} -> [maps:put(node, Node, St)]; + {error, _} -> [maps:put(node, Node, #{is_alive => false})] + end + || Node <- ekka_mnesia:running_nodes()]), lists:flatten(io_lib:format("resource(id='~s', type='~s', config=~0p, status=~0p, description='~s')~n", [Id, Type, Config, Status, Descr])); format(#resource_type{name = Name, @@ -369,12 +372,20 @@ get_actions() -> emqx_rule_registry:get_actions(). get_rule_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) - || Node <- [node()| nodes()]]. + lists:concat( + [ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). get_action_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- [node()| nodes()]]. + lists:concat( + [ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of + {badrpc, _} -> []; + Res -> [maps:put(node, Node, Res)] + end + || Node <- ekka_mnesia:running_nodes()]). on_failed(continue) -> continue; on_failed(stop) -> stop; diff --git a/etc/emqx.conf b/etc/emqx.conf index 2be03e193..5210b1603 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2292,14 +2292,27 @@ broker.session_locking_strategy = quorum ## Dispatch strategy for shared subscription ## ## Value: Enum +## - hash_clientid +## - hash # same as hash_clientid +## - hash_topic +## - local ## - random ## - round_robin ## - sticky -## - hash # same as hash_clientid -## - hash_clientid -## - hash_topic broker.shared_subscription_strategy = random +## Per group dispatch strategy for shared subscription +## +## Value: Enum +## - hash_clientid +## - hash # same as hash_clientid +## - hash_topic +## - local +## - random +## - round_robin +## - sticky +#broker.sample_group.shared_subscription_strategy = local + ## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages ## This should allow messages to be dispatched to a different subscriber in ## the group in case the picked (based on shared_subscription_strategy) one # is offline diff --git a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl index c4b47d4f9..514929687 100644 --- a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -310,8 +310,16 @@ is_lib(Path) -> string:prefix(Path, code:lib_dir()) =:= nomatch. setup_node(Node, Apps) -> + LoadedPlugins = emqx_ct_helpers:deps_path( + emqx, + filename:join(["test", "emqx_SUITE_data", "loaded_plugins"])), + LoadedModules = emqx_ct_helpers:deps_path( + emqx, + filename:join(["test", "emqx_SUITE_data", "loaded_modules"])), EnvHandler = fun(emqx) -> + application:set_env(emqx, plugins_loaded_file, LoadedPlugins), + application:set_env(emqx, modules_loaded_file, LoadedModules), application:set_env(emqx, listeners, []), application:set_env(gen_rpc, port_discovery, manual), ok; diff --git a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl index e6a9f6c16..cb9d68af0 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl @@ -62,7 +62,7 @@ t_mod_rewrite(_Config) -> timer:sleep(100), ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), %% Pub Rules - {ok, _Props1, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), + {ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), RecvTopics2 = [begin ok = emqtt:publish(C, Topic, <<"payload">>), {ok, #{topic := RecvTopic}} = receive_publish(100), diff --git a/priv/emqx.schema b/priv/emqx.schema index 2359461ee..cc9a85799 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2401,7 +2401,7 @@ end}. {datatype, {enum, [local,leader,quorum,all]}} ]}. -%% @doc Shared Subscription Dispatch Strategy. +%% @doc Default shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ {default, round_robin}, {datatype, @@ -2410,11 +2410,37 @@ end}. round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash, %% hash client ID to a group member + local, %% send to some locally available subscriber hash_clientid, hash_topic ]}} ]}. +%% @doc Per group Shared Subscription Dispatch Strategy +{mapping, "broker.$name.shared_subscription_strategy", "emqx.shared_subscription_strategy_per_group", [ + {default, round_robin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash, %% hash client ID to a group member + local, %% send to some locally available subscriber + hash_clientid, + hash_topic + ]}} +]}. + +{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) -> + Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf), + Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) -> + {true, {Group, Strategy}}; + (_) -> + false + end, Conf0), + maps:from_list(Groups) +end}. + %% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages {mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled", [ {default, false}, diff --git a/rebar.config b/rebar.config index 50a60fe89..844bca2ee 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,4 @@ +%% -*- mode: erlang -*- %% This config file is the very basic config to compile emqx %% This allows emqx to be used as a dependency for other applications %% such as emqx module/plugin develpments and tests. @@ -39,12 +40,12 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.15"}}} - , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} + , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.1"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.5"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.8"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.9"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index 26e129cfd..16de56630 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -8,8 +8,8 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" case "${PKG_VSN}" in 4.3*) - EMQX_CE_DASHBOARD_VERSION='v4.3.6' - EMQX_EE_DASHBOARD_VERSION='v4.3.16' + EMQX_CE_DASHBOARD_VERSION='v4.3.7' + EMQX_EE_DASHBOARD_VERSION='v4.3.18' ;; 4.4*) # keep the above 4.3 untouched, otherwise conflicts! diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 6c25d9f5c..fb6cf7db7 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -4,6 +4,8 @@ [{"4.4.2", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -21,6 +23,7 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -41,6 +44,7 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -64,6 +68,8 @@ [{"4.4.2", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -81,6 +87,7 @@ {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -101,6 +108,7 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa71b16ae..5938dc6f2 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -265,7 +265,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, < - {TopicName, Rest} = parse_topic_name(Bin, StrictMode), + {TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {PacketId, Rest1} = case QoS of ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) @@ -273,6 +273,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, (PacketId =/= undefined) andalso StrictMode andalso validate_packet_id(PacketId), {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), + ok = ensure_topic_name_valid(StrictMode, TopicName, Properties), Publish = #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties @@ -357,8 +358,9 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, proto_ver = Ver}, Bin, StrictMode) -> {Props, Rest} = parse_properties(Bin, Ver, StrictMode), - {Topic, Rest1} = parse_topic_name(Rest, StrictMode), + {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), + ok = ensure_topic_name_valid(StrictMode, Topic, Props), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, will_payload = Payload @@ -524,13 +526,14 @@ parse_binary_data(Bin) when 2 > byte_size(Bin) -> error(malformed_binary_data_length). -parse_topic_name(Bin, false) -> - parse_utf8_string(Bin, false); -parse_topic_name(Bin, true) -> - case parse_utf8_string(Bin, true) of - {<<>>, _Rest} -> error(empty_topic_name); - Result -> Result - end. +ensure_topic_name_valid(false, _TopicName, _Properties) -> + ok; +ensure_topic_name_valid(true, TopicName, _Properties) when TopicName =/= <<>> -> + ok; +ensure_topic_name_valid(true, <<>>, #{'Topic-Alias' := _}) -> + ok; +ensure_topic_name_valid(true, <<>>, _) -> + error(empty_topic_name). %%-------------------------------------------------------------------- %% Serialize MQTT Packet diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 2de15eb55..01aac200f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -47,7 +47,12 @@ ]). %% for testing --export([subscribers/2, ack_enabled/0]). +-ifdef(TEST). +-export([ subscribers/2 + , ack_enabled/0 + , strategy/1 + ]). +-endif. %% gen_server callbacks -export([ init/1 @@ -63,6 +68,7 @@ -type strategy() :: random | round_robin | sticky + | local | hash %% same as hash_clientid, backward compatible | hash_clientid | hash_topic. @@ -121,7 +127,7 @@ dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of + case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> @@ -133,9 +139,15 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec(strategy() -> strategy()). -strategy() -> - emqx:get_env(shared_subscription_strategy, random). +-spec(strategy(emqx_topic:group()) -> strategy()). +strategy(Group) -> + case emqx:get_env(shared_subscription_strategy_per_group, #{}) of + #{Group := Strategy} -> + Strategy; + _ -> + emqx:get_env(shared_subscription_strategy, random) + end. + -spec(ack_enabled() -> boolean()). ack_enabled() -> @@ -267,6 +279,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> end. pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub; +pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) -> + case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of + [_ | _] = LocalSubs -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs); + [] -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs) + end; pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)), lists:nth(Nth, Subs). diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index c61272e64..e62d7043f 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -29,6 +29,7 @@ ]). -export([ version/0 + , cluster_name/0 , uptime/0 , datetime/0 , sysdescr/0 @@ -87,6 +88,10 @@ stop() -> -spec(version() -> string()). version() -> emqx_app:get_release(). +%% @doc Get cluster name +-spec(cluster_name() -> string()). +cluster_name() -> atom_to_list(ekka:cluster_name()). + %% @doc Get sys description -spec(sysdescr() -> string()). sysdescr() -> emqx_app:get_description(). diff --git a/test/emqx_cm_locker_SUITE.erl b/test/emqx_cm_locker_SUITE.erl index ec40a8985..90320a811 100644 --- a/test/emqx_cm_locker_SUITE.erl +++ b/test/emqx_cm_locker_SUITE.erl @@ -39,7 +39,7 @@ t_trans(_) -> ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end). t_lock_unlocak(_) -> - {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>), - {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>). + {true, _} = emqx_cm_locker:lock(<<"clientid">>), + {true, _} = emqx_cm_locker:lock(<<"clientid">>), + {true, _} = emqx_cm_locker:unlock(<<"clientid">>), + {true, _} = emqx_cm_locker:unlock(<<"clientid">>). diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 81c861bdb..8074749a9 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -163,12 +163,15 @@ t_parse_malformed_utf8_string(_) -> ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). t_parse_empty_topic_name(_) -> - Packet = <<48, 4, 0, 0, 0, 1>>, - NormalState = emqx_frame:initial_parse_state(#{strict_mode => false}), - ?assertMatch({_, _}, emqx_frame:parse(Packet, NormalState)), + Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<>>), + ?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})), + ?catch_error(empty_topic_name, parse_serialize(Packet, #{strict_mode => true})). - StrictState = emqx_frame:initial_parse_state(#{strict_mode => true}), - ?catch_error(empty_topic_name, emqx_frame:parse(Packet, StrictState)). +t_parse_empty_topic_name_with_alias(_) -> + Props = #{'Topic-Alias' => 16#AB}, + Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, Props, <<>>), + ?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})), + ?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => true})). t_parse_frame_proxy_protocol(_) -> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 54a0de6d2..617b62eb0 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -35,6 +35,7 @@ all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -109,9 +110,9 @@ t_no_connection_nack(_) -> ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}], {ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp), - {ok, _Props} = emqtt:connect(SubConnPid1), + {ok, _} = emqtt:connect(SubConnPid1), {ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp), - {ok, _Props} = emqtt:connect(SubConnPid2), + {ok, _} = emqtt:connect(SubConnPid2), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), emqtt:subscribe(SubConnPid1, ShareTopic, QoS), @@ -125,7 +126,7 @@ t_no_connection_nack(_) -> emqx:publish(M#message{id = PacketId}) end, SendF(1), - timer:sleep(200), + ct:sleep(200), %% This is the connection which was picked by broker to dispatch (sticky) for 1st message ?assertMatch([#{packet_id := 1}], recv_msgs(1)), @@ -164,19 +165,24 @@ t_no_connection_nack(_) -> ok. t_random(_) -> + ok = ensure_config(random, true), test_two_messages(random). t_round_robin(_) -> + ok = ensure_config(round_robin, true), test_two_messages(round_robin). t_sticky(_) -> + ok = ensure_config(sticky, true), test_two_messages(sticky). t_hash(_) -> - test_two_messages(hash, false). + ok = ensure_config(hash, false), + test_two_messages(hash). t_hash_clinetid(_) -> - test_two_messages(hash_clientid, false). + ok = ensure_config(hash_clientid, false), + test_two_messages(hash_clientid). t_hash_topic(_) -> ok = ensure_config(hash_topic, false), @@ -242,55 +248,48 @@ t_not_so_sticky(_) -> ok. test_two_messages(Strategy) -> - test_two_messages(Strategy, _WithAck = true). + test_two_messages(Strategy, <<"group1">>). -test_two_messages(Strategy, WithAck) -> - ok = ensure_config(Strategy, WithAck), +test_two_messages(Strategy, Group) -> Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), - {ok, _} = emqtt:connect(ConnPid1), {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 0}), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>), - emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}), - emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}), ct:sleep(100), + emqx:publish(Message1), - Me = self(), - WaitF = fun(ExpectedPayload) -> - case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of - {true, Pid} -> - Me ! {subscriber, Pid}, - true; - Other -> - Other - end - end, - WaitF(<<"hello1">>), - UsedSubPid1 = receive {subscriber, P1} -> P1 end, - emqx_broker:publish(Message2), - WaitF(<<"hello2">>), - UsedSubPid2 = receive {subscriber, P2} -> P2 end, - case Strategy of - sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); - round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); - hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); - _ -> ok - end, + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + emqx:publish(Message2), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + emqtt:stop(ConnPid1), emqtt:stop(ConnPid2), + + case Strategy of + sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2); + round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2); + hash -> ?assertEqual(UsedSubPid1, UsedSubPid2); + _ -> ok + end, ok. last_message(ExpectedPayload, Pids) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> - ct:pal("~p ====== ~p", [Pids, Pid]), + ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), {true, Pid} after 100 -> + ct:pal("not yet"), <<"not yet?">> end. @@ -314,6 +313,103 @@ t_uncovered_func(_) -> ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. +t_per_group_config(_) -> + ok = ensure_group_config(#{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + %% Each test is repeated 4 times because random strategy may technically pass the test + %% so we run 8 tests to make random pass in only 1/256 runs + + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>). + +t_local(_) -> + Node = start_slave('local_shared_sub_test', 21884), + GroupConfig = #{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }, + ok = ensure_group_config(Node, GroupConfig), + ok = ensure_group_config(GroupConfig), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {port, 21884}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group/local_foo/bar">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/local_group/local_foo/bar">>, 0}), + + ct:sleep(100), + + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]), + + emqtt:stop(ConnPid1), + emqtt:stop(ConnPid2), + stop_slave(Node), + + ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)), + ?assertEqual(local, RemoteLocalGroupStrategy), + + ?assertNotEqual(UsedSubPid1, UsedSubPid2), + ok. + +t_local_fallback(_) -> + ok = ensure_group_config(#{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + Node = start_slave('local_fallback_shared_sub_test', 11885), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, _} = emqtt:connect(ConnPid1), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]), + + emqtt:stop(ConnPid1), + stop_slave(Node), + + ?assertEqual(UsedSubPid1, UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -326,6 +422,12 @@ ensure_config(Strategy, AckEnabled) -> application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled), ok. +ensure_group_config(Node, Group2Strategy) -> + rpc:call(Node, application, set_env, [emqx, shared_subscription_strategy_per_group, Group2Strategy]). + +ensure_group_config(Group2Strategy) -> + application:set_env(emqx, shared_subscription_strategy_per_group, Group2Strategy). + subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). @@ -343,3 +445,50 @@ recv_msgs(Count, Msgs) -> Msgs end. +start_slave(Name, Port) -> + {ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()), + [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]), + + pong = net_adm:ping(Node), + setup_node(Node, Port), + Node. + +stop_slave(Node) -> + rpc:call(Node, ekka, leave, []), + ct_slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch. + +setup_node(Node, Port) -> + EnvHandler = + fun(emqx) -> + application:set_env( + emqx, + listeners, + [#{listen_on => {{127,0,0,1},Port}, + name => "internal", + opts => [{zone,internal}], + proto => tcp}]), + application:set_env(gen_rpc, port_discovery, manual), + ok; + (_) -> + ok + end, + + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], + ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]), + + rpc:call(Node, ekka, join, [node()]), + + ok.