Merge pull request #7563 from HJianBo/merge-main-v4.3-into-v4.4
Merge main v4.3 into v4.4
This commit is contained in:
commit
f5732472df
|
@ -14,14 +14,18 @@ File format:
|
|||
|
||||
### Enhancements
|
||||
|
||||
* In order to fix the execution order of exhook, e.g. before/after other plugins/modules,
|
||||
ExHook now supports user customizing emqx_hook execute priority.
|
||||
* Add `RequestMeta` for exhook.proto in order to expose `cluster_name` of emqx in each gRPC request. [#7524]
|
||||
* Support customize emqx_exhook execution priority. [#7408]
|
||||
* add api: PUT /rules/{id}/reset_metrics.
|
||||
This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474]
|
||||
* Enhanced rule engine error handling when json parsing error.
|
||||
* Add support for `RSA-PSK-AES256-GCM-SHA384`, `RSA-PSK-AES256-CBC-SHA384`,
|
||||
`RSA-PSK-AES128-GCM-SHA256`, `RSA-PSK-AES128-CBC-SHA256` PSK ciphers, and remove `PSK-3DES-EDE-CBC-SHA`,
|
||||
`PSK-RC4-SHA` from the default configuration. [#7427]
|
||||
* Diagnostic logging for mnesia `wait_for_table`
|
||||
- prints check points of mnesia internal stats
|
||||
- prints check points of per table loading stats
|
||||
Help to locate the problem of long table loading time.
|
||||
|
||||
### Bug fixes
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_mongo,
|
||||
[{description, "EMQ X Authentication/ACL with MongoDB"},
|
||||
{vsn, "4.4.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.4.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_mongo_sup]},
|
||||
{applications, [kernel,stdlib,mongodb,ecpool]},
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.1",
|
||||
[{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
[{"4.4.2",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
|
@ -10,8 +14,12 @@
|
|||
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.1",
|
||||
[{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
[{"4.4.2",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_auth_mongo_sup,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -35,6 +35,10 @@
|
|||
, query_multi/3
|
||||
]).
|
||||
|
||||
-export([ available/2
|
||||
, available/3
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?AUTH_METRICS).
|
||||
|
@ -97,6 +101,56 @@ is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Sele
|
|||
end
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Availability Test
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
available(Pool, #superquery{collection = Collection, selector = Selector}) ->
|
||||
available(Pool, Collection, maps:from_list(replvars(Selector, test_client_info())));
|
||||
available(Pool, #authquery{collection = Collection, selector = Selector}) ->
|
||||
available(Pool, Collection, maps:from_list(replvars(Selector, test_client_info())));
|
||||
available(Pool, #aclquery{collection = Collection, selector = Selectors}) ->
|
||||
Fun =
|
||||
fun(Selector) ->
|
||||
maps:from_list(emqx_auth_mongo:replvars(Selector, test_client_info()))
|
||||
end,
|
||||
available(Pool, Collection, lists:map(Fun, Selectors), fun query_multi/3).
|
||||
|
||||
available(Pool, Collection, Query) ->
|
||||
available(Pool, Collection, Query, fun query/3).
|
||||
|
||||
available(Pool, Collection, Query, Fun) ->
|
||||
try Fun(Pool, Collection, Query) of
|
||||
{error, Reason} ->
|
||||
?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]),
|
||||
{error, Reason};
|
||||
Error = #{<<"code">> := Code} ->
|
||||
CodeName = maps:get(<<"codeName">>, Error, undefined),
|
||||
ErrorMessage = maps:get(<<"errmsg">>, Error, undefined),
|
||||
?LOG(error, "[MongoDB] ~p availability test error, code: ~p Name: ~0p Message: ~0p",
|
||||
[Collection, Code, CodeName, ErrorMessage]),
|
||||
{error, {mongo_error, Code}};
|
||||
_Return ->
|
||||
%% Any success result is fine.
|
||||
ok
|
||||
catch E:R:S ->
|
||||
?LOG(error, "[MongoDB] ~p availability test error, ~p: ~0p: ~0p", [Collection, E, R, S]),
|
||||
{error, R}
|
||||
end.
|
||||
|
||||
%% Test client info
|
||||
test_client_info() ->
|
||||
#{
|
||||
clientid => <<"EMQX_availability_test_client">>,
|
||||
username => <<"EMQX_availability_test_username">>,
|
||||
cn => <<"EMQX_availability_test_cn">>,
|
||||
dn => <<"EMQX_availability_test_dn">>
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal func
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
replvars(VarList, ClientInfo) ->
|
||||
lists:map(fun(Var) -> replvar(Var, ClientInfo) end, VarList).
|
||||
|
||||
|
|
|
@ -36,26 +36,65 @@
|
|||
|
||||
start(_StartType, _StartArgs) ->
|
||||
{ok, Sup} = emqx_auth_mongo_sup:start_link(),
|
||||
with_env(auth_query, fun reg_authmod/1),
|
||||
with_env(acl_query, fun reg_aclmod/1),
|
||||
ok = safe_start(),
|
||||
{ok, Sup}.
|
||||
|
||||
prep_stop(State) ->
|
||||
ok = emqx:unhook('client.authenticate', fun emqx_auth_mongo:check/3),
|
||||
ok = emqx:unhook('client.check_acl', fun emqx_acl_mongo:check_acl/5),
|
||||
ok = unload_hook(),
|
||||
_ = stop_pool(),
|
||||
State.
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
||||
unload_hook() ->
|
||||
ok = emqx:unhook('client.authenticate', fun emqx_auth_mongo:check/3),
|
||||
ok = emqx:unhook('client.check_acl', fun emqx_acl_mongo:check_acl/5).
|
||||
|
||||
stop_pool() ->
|
||||
ecpool:stop_sup_pool(?APP).
|
||||
|
||||
safe_start() ->
|
||||
try
|
||||
ok = with_env(auth_query, fun reg_authmod/1),
|
||||
ok = with_env(acl_query, fun reg_aclmod/1),
|
||||
ok
|
||||
catch _E:R:_S ->
|
||||
unload_hook(),
|
||||
_ = stop_pool(),
|
||||
{error, R}
|
||||
end.
|
||||
|
||||
reg_authmod(AuthQuery) ->
|
||||
emqx_auth_mongo:register_metrics(),
|
||||
SuperQuery = r(super_query, application:get_env(?APP, super_query, undefined)),
|
||||
ok = emqx:hook('client.authenticate', fun emqx_auth_mongo:check/3,
|
||||
[#{authquery => AuthQuery, superquery => SuperQuery, pool => ?APP}]).
|
||||
case emqx_auth_mongo:available(?APP, AuthQuery) of
|
||||
ok ->
|
||||
emqx_auth_mongo:register_metrics(),
|
||||
HookFun = fun emqx_auth_mongo:check/3,
|
||||
HookOptions = #{authquery => AuthQuery, superquery => undefined, pool => ?APP},
|
||||
case r(super_query, application:get_env(?APP, super_query, undefined)) of
|
||||
undefined ->
|
||||
ok = emqx:hook('client.authenticate', HookFun, [HookOptions]);
|
||||
SuperQuery ->
|
||||
case emqx_auth_mongo:available(?APP, SuperQuery) of
|
||||
ok ->
|
||||
ok = emqx:hook('client.authenticate', HookFun,
|
||||
[HookOptions#{superquery => SuperQuery}]);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
reg_aclmod(AclQuery) ->
|
||||
ok = emqx:hook('client.check_acl', fun emqx_acl_mongo:check_acl/5, [#{aclquery => AclQuery, pool => ?APP}]).
|
||||
case emqx_auth_mongo:available(?APP, AclQuery) of
|
||||
ok ->
|
||||
ok = emqx:hook('client.check_acl', fun emqx_acl_mongo:check_acl/5,
|
||||
[#{aclquery => AclQuery, pool => ?APP}]);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
|
|
|
@ -112,8 +112,8 @@ t_rpc(Config) when is_list(Config) ->
|
|||
ClientId = <<"ClientId">>,
|
||||
try
|
||||
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
|
||||
{ok, _Props} = emqtt:connect(ConnPid),
|
||||
{ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}),
|
||||
{ok, _} = emqtt:connect(ConnPid),
|
||||
{ok, _, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}),
|
||||
timer:sleep(100),
|
||||
{ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1),
|
||||
timer:sleep(100),
|
||||
|
@ -151,14 +151,14 @@ t_mqtt(Config) when is_list(Config) ->
|
|||
ssl => false,
|
||||
%% Consume back to forwarded message for verification
|
||||
%% NOTE: this is a indefenite loopback without mocking emqx_bridge_worker:import_batch/1
|
||||
subscriptions => [{SendToTopic2, _QoS = 1}],
|
||||
subscriptions => [{SendToTopic2, ?QOS_1}],
|
||||
receive_mountpoint => <<"receive/aws/">>,
|
||||
start_type => auto},
|
||||
{ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
|
||||
ClientId = <<"client-1">>,
|
||||
try
|
||||
?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Pid)),
|
||||
ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, _QoS = 1),
|
||||
ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, ?QOS_1),
|
||||
?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}],
|
||||
emqx_bridge_worker:get_subscriptions(Pid)),
|
||||
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
|
||||
|
|
|
@ -54,7 +54,7 @@ t_update_max_age(_Config) ->
|
|||
?LOGT("Reply =~p", [Reply]),
|
||||
{ok,created, #coap_content{location_path = LocPath}} = Reply,
|
||||
?assertEqual([<<"/ps/topic1">>] ,LocPath),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
?LOGT("lookup topic info=~p", [TopicInfo]),
|
||||
?assertEqual(60, MaxAge1),
|
||||
?assertEqual(<<"42">>, CT1),
|
||||
|
@ -65,7 +65,7 @@ t_update_max_age(_Config) ->
|
|||
Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 70, format = <<"application/link-format">>, payload = Payload1}),
|
||||
{ok,created, #coap_content{location_path = LocPath}} = Reply1,
|
||||
?assertEqual([<<"/ps/topic1">>] ,LocPath),
|
||||
[{TopicInPayload, MaxAge2, CT2, _ResPayload, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
[{TopicInPayload, MaxAge2, CT2, _ResPayload2, _TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
?assertEqual(70, MaxAge2),
|
||||
?assertEqual(<<"50">>, CT2),
|
||||
|
||||
|
@ -82,7 +82,7 @@ t_create_subtopic(_Config) ->
|
|||
?LOGT("Reply =~p", [Reply]),
|
||||
{ok,created, #coap_content{location_path = LocPath}} = Reply,
|
||||
?assertEqual([<<"/ps/topic1">>] ,LocPath),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, _TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
?LOGT("lookup topic info=~p", [TopicInfo]),
|
||||
?assertEqual(60, MaxAge1),
|
||||
?assertEqual(<<"42">>, CT1),
|
||||
|
@ -99,7 +99,7 @@ t_create_subtopic(_Config) ->
|
|||
?LOGT("Reply =~p", [Reply1]),
|
||||
{ok,created, #coap_content{location_path = LocPath1}} = Reply1,
|
||||
?assertEqual([<<"/ps/topic1/subtopic">>] ,LocPath1),
|
||||
[{FullTopic, MaxAge2, CT2, _ResPayload, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
|
||||
[{FullTopic, MaxAge2, CT2, _ResPayload2, _}] = emqx_coap_pubsub_topics:lookup_topic_info(FullTopic),
|
||||
?assertEqual(60, MaxAge2),
|
||||
?assertEqual(<<"42">>, CT2),
|
||||
|
||||
|
@ -132,7 +132,7 @@ t_refreash_max_age(_Config) ->
|
|||
?LOGT("Reply =~p", [Reply]),
|
||||
{ok,created, #coap_content{location_path = LocPath}} = Reply,
|
||||
?assertEqual([<<"/ps/topic1">>] ,LocPath),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
TopicInfo = [{TopicInPayload, MaxAge1, CT1, _ResPayload1, TimeStamp}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
?LOGT("lookup topic info=~p", [TopicInfo]),
|
||||
?LOGT("TimeStamp=~p", [TimeStamp]),
|
||||
?assertEqual(5, MaxAge1),
|
||||
|
@ -144,7 +144,7 @@ t_refreash_max_age(_Config) ->
|
|||
Reply1 = er_coap_client:request(post, URI, #coap_content{max_age = 5, format = <<"application/link-format">>, payload = Payload1}),
|
||||
{ok,created, #coap_content{location_path = LocPath}} = Reply1,
|
||||
?assertEqual([<<"/ps/topic1">>] ,LocPath),
|
||||
[{TopicInPayload, MaxAge2, CT2, _ResPayload, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
[{TopicInPayload, MaxAge2, CT2, _ResPayload2, TimeStamp1}] = emqx_coap_pubsub_topics:lookup_topic_info(TopicInPayload),
|
||||
?LOGT("TimeStamp1=~p", [TimeStamp1]),
|
||||
?assertEqual(5, MaxAge2),
|
||||
?assertEqual(<<"50">>, CT2),
|
||||
|
|
|
@ -25,5 +25,5 @@ data/
|
|||
*.class
|
||||
Mnesia.nonode@nohost/
|
||||
src/emqx_exhook_pb.erl
|
||||
src/emqx_exhook_v_1_hook_provider_client.erl
|
||||
src/emqx_exhook_v_1_hook_provider_bhvr.erl
|
||||
src/emqx_exhook_v_*_hook_provider_client.erl
|
||||
src/emqx_exhook_v_*_hook_provider_bhvr.erl
|
||||
|
|
|
@ -22,6 +22,7 @@ option java_multiple_files = true;
|
|||
option java_package = "io.emqx.exhook";
|
||||
option java_outer_classname = "EmqxExHookProto";
|
||||
|
||||
// Proto package compatible, Don't need an updated version..
|
||||
package emqx.exhook.v1;
|
||||
|
||||
service HookProvider {
|
||||
|
@ -76,6 +77,8 @@ service HookProvider {
|
|||
message ProviderLoadedRequest {
|
||||
|
||||
BrokerInfo broker = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message LoadedResponse {
|
||||
|
@ -83,7 +86,11 @@ message LoadedResponse {
|
|||
repeated HookSpec hooks = 1;
|
||||
}
|
||||
|
||||
message ProviderUnloadedRequest { }
|
||||
message ProviderUnloadedRequest {
|
||||
|
||||
RequestMeta meta = 1;
|
||||
}
|
||||
|
||||
|
||||
message ClientConnectRequest {
|
||||
|
||||
|
@ -93,6 +100,8 @@ message ClientConnectRequest {
|
|||
//
|
||||
// It should be empty on MQTT v3.1.1/v3.1 or others protocol
|
||||
repeated Property props = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message ClientConnackRequest {
|
||||
|
@ -102,11 +111,15 @@ message ClientConnackRequest {
|
|||
string result_code = 2;
|
||||
|
||||
repeated Property props = 3;
|
||||
|
||||
RequestMeta meta = 4;
|
||||
}
|
||||
|
||||
message ClientConnectedRequest {
|
||||
|
||||
ClientInfo clientinfo = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message ClientDisconnectedRequest {
|
||||
|
@ -114,6 +127,8 @@ message ClientDisconnectedRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
string reason = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message ClientAuthenticateRequest {
|
||||
|
@ -121,6 +136,8 @@ message ClientAuthenticateRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
bool result = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message ClientCheckAclRequest {
|
||||
|
@ -139,6 +156,8 @@ message ClientCheckAclRequest {
|
|||
string topic = 3;
|
||||
|
||||
bool result = 4;
|
||||
|
||||
RequestMeta meta = 5;
|
||||
}
|
||||
|
||||
message ClientSubscribeRequest {
|
||||
|
@ -148,6 +167,8 @@ message ClientSubscribeRequest {
|
|||
repeated Property props = 2;
|
||||
|
||||
repeated TopicFilter topic_filters = 3;
|
||||
|
||||
RequestMeta meta = 4;
|
||||
}
|
||||
|
||||
message ClientUnsubscribeRequest {
|
||||
|
@ -157,11 +178,15 @@ message ClientUnsubscribeRequest {
|
|||
repeated Property props = 2;
|
||||
|
||||
repeated TopicFilter topic_filters = 3;
|
||||
|
||||
RequestMeta meta = 4;
|
||||
}
|
||||
|
||||
message SessionCreatedRequest {
|
||||
|
||||
ClientInfo clientinfo = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message SessionSubscribedRequest {
|
||||
|
@ -171,6 +196,8 @@ message SessionSubscribedRequest {
|
|||
string topic = 2;
|
||||
|
||||
SubOpts subopts = 3;
|
||||
|
||||
RequestMeta meta = 4;
|
||||
}
|
||||
|
||||
message SessionUnsubscribedRequest {
|
||||
|
@ -178,21 +205,29 @@ message SessionUnsubscribedRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
string topic = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message SessionResumedRequest {
|
||||
|
||||
ClientInfo clientinfo = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message SessionDiscardedRequest {
|
||||
|
||||
ClientInfo clientinfo = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message SessionTakeoveredRequest {
|
||||
|
||||
ClientInfo clientinfo = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message SessionTerminatedRequest {
|
||||
|
@ -200,11 +235,15 @@ message SessionTerminatedRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
string reason = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message MessagePublishRequest {
|
||||
|
||||
Message message = 1;
|
||||
|
||||
RequestMeta meta = 2;
|
||||
}
|
||||
|
||||
message MessageDeliveredRequest {
|
||||
|
@ -212,6 +251,8 @@ message MessageDeliveredRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
Message message = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message MessageDroppedRequest {
|
||||
|
@ -219,6 +260,8 @@ message MessageDroppedRequest {
|
|||
Message message = 1;
|
||||
|
||||
string reason = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
message MessageAckedRequest {
|
||||
|
@ -226,6 +269,8 @@ message MessageAckedRequest {
|
|||
ClientInfo clientinfo = 1;
|
||||
|
||||
Message message = 2;
|
||||
|
||||
RequestMeta meta = 3;
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
@ -430,3 +475,14 @@ message SubOpts {
|
|||
// connection with a ClientID equal to the ClientID of the publishing
|
||||
uint32 nl = 5;
|
||||
}
|
||||
|
||||
message RequestMeta {
|
||||
|
||||
string node = 1;
|
||||
|
||||
string version = 2;
|
||||
|
||||
string sysdescr = 3;
|
||||
|
||||
string cluster_name = 4;
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_exhook,
|
||||
[{description, "EMQ X Extension for Hook"},
|
||||
{vsn, "4.4.1"},
|
||||
|
|
|
@ -2,12 +2,18 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
|
||||
{update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -30,6 +30,11 @@
|
|||
, call_fold/3
|
||||
]).
|
||||
|
||||
-export([request_meta/0]).
|
||||
|
||||
-import(emqx_exhook_handler, [stringfy/1]).
|
||||
%% TODO: move util functions to an independent module
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mgmt APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -116,3 +121,10 @@ deny_action_result('message.publish', Msg) ->
|
|||
%% TODO: Not support to deny a message
|
||||
%% maybe we can put the 'allow_publish' into message header
|
||||
Msg.
|
||||
|
||||
request_meta() ->
|
||||
#{ node => stringfy(node())
|
||||
, version => emqx_sys:version()
|
||||
, sysdescr => emqx_sys:sysdescr()
|
||||
, cluster_name => emqx_sys:cluster_name()
|
||||
}.
|
||||
|
|
|
@ -332,6 +332,8 @@ stringfy(Term) when is_integer(Term) ->
|
|||
integer_to_binary(Term);
|
||||
stringfy(Term) when is_atom(Term) ->
|
||||
atom_to_binary(Term, utf8);
|
||||
stringfy(Term) when is_list(Term) ->
|
||||
list_to_binary(Term);
|
||||
stringfy(Term) ->
|
||||
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
|
||||
|
||||
|
|
|
@ -276,23 +276,24 @@ match_topic_filter(TopicName, TopicFilter) ->
|
|||
|
||||
-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
||||
do_call(ChannName, Fun, Req, ReqOpts) ->
|
||||
NReq = Req#{meta => emqx_exhook:request_meta()},
|
||||
Options = ReqOpts#{channel => ChannName},
|
||||
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
|
||||
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
|
||||
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]),
|
||||
case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of
|
||||
{ok, Resp, Metadata} ->
|
||||
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]),
|
||||
{ok, Resp};
|
||||
{error, {Code, Msg}, _Metadata} ->
|
||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
|
||||
[?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
|
||||
[?PB_CLIENT_MOD, Fun, NReq, Options, Code, Msg]),
|
||||
{error, {Code, Msg}};
|
||||
{error, Reason} ->
|
||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
|
||||
[?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
|
||||
[?PB_CLIENT_MOD, Fun, NReq, Options, Reason]),
|
||||
{error, Reason};
|
||||
{'EXIT', {Reason, Stk}} ->
|
||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
|
||||
[?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
|
||||
[?PB_CLIENT_MOD, Fun, NReq, Options, Reason, Stk]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
|
|
|
@ -19,9 +19,14 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_exhook.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
|
||||
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -99,29 +104,63 @@ t_cli_stats(_) ->
|
|||
unmeck_print().
|
||||
|
||||
t_priority(_) ->
|
||||
restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]),
|
||||
restart_apps_with_envs([ {emqx, fun set_special_cfgs/1}
|
||||
, {emqx_exhook, [{hook_priority, 1}]}]),
|
||||
|
||||
emqx_exhook:disable(default),
|
||||
ok = emqx_exhook:enable(default),
|
||||
[Callback | _] = emqx_hooks:lookup('client.connected'),
|
||||
1 = emqx_hooks:callback_priority(Callback).
|
||||
?assertEqual(1, emqx_hooks:callback_priority(Callback)).
|
||||
|
||||
t_cluster_name(_) ->
|
||||
SetEnvFun =
|
||||
fun(emqx) ->
|
||||
set_special_cfgs(emqx),
|
||||
application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM);
|
||||
(emqx_exhook) ->
|
||||
application:set_env(emqx_exhook, hook_priority, 1)
|
||||
end,
|
||||
|
||||
emqx_ct_helpers:stop_apps([emqx, emqx_exhook]),
|
||||
emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
|
||||
|
||||
?assertEqual(?OTHER_CLUSTER_NAME_STRING, emqx_sys:cluster_name()),
|
||||
|
||||
emqx_exhook:disable(default),
|
||||
ok = emqx_exhook:enable(default),
|
||||
%% See emqx_exhook_demo_svr:on_provider_loaded/2
|
||||
?assertEqual([], emqx_hooks:lookup('session.created')),
|
||||
?assertEqual([], emqx_hooks:lookup('message_publish')),
|
||||
|
||||
[Callback | _] = emqx_hooks:lookup('client.connected'),
|
||||
?assertEqual(1, emqx_hooks:callback_priority(Callback)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% TODO: make it more general and move to `emqx_ct_helpers`
|
||||
restart_exhook_with_envs(Envs) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_exhook]),
|
||||
SetPriorityFun
|
||||
= fun(emqx) ->
|
||||
set_special_cfgs(emqx);
|
||||
(emqx_exhook) ->
|
||||
lists:foreach(fun({App, Key, Val}) ->
|
||||
application:set_env(App, Key, Val)
|
||||
end, Envs)
|
||||
end,
|
||||
emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun).
|
||||
restart_app_with_envs(App, Fun)
|
||||
when is_function(Fun) ->
|
||||
emqx_ct_helpers:stop_apps([App]),
|
||||
emqx_ct_helpers:start_apps([App], Fun);
|
||||
|
||||
restart_app_with_envs(App, Envs)
|
||||
when is_list(Envs) ->
|
||||
emqx_ct_helpers:stop_apps([App]),
|
||||
HandlerFun =
|
||||
fun(AppName) ->
|
||||
lists:foreach(fun({Key, Val}) ->
|
||||
application:set_env(AppName, Key, Val)
|
||||
end, Envs)
|
||||
end,
|
||||
emqx_ct_helpers:start_apps([App], HandlerFun).
|
||||
|
||||
restart_apps_with_envs([]) ->
|
||||
ok;
|
||||
restart_apps_with_envs([{App, Envs} | Rest]) ->
|
||||
restart_app_with_envs(App, Envs),
|
||||
restart_apps_with_envs(Rest).
|
||||
|
||||
meck_print() ->
|
||||
meck:new(emqx_ctl, [passthrough, no_history, no_link]),
|
||||
|
|
|
@ -51,6 +51,8 @@
|
|||
|
||||
-define(PORT, 9000).
|
||||
-define(NAME, ?MODULE).
|
||||
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
|
||||
-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Server APIs
|
||||
|
@ -112,30 +114,37 @@ reply(Q1, Q2) ->
|
|||
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
|
||||
| {error, grpc_cowboy_h:error_response()}.
|
||||
|
||||
on_provider_loaded(Req, Md) ->
|
||||
on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
|
||||
?MODULE:in({?FUNCTION_NAME, Req}),
|
||||
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
|
||||
{ok, #{hooks => [
|
||||
#{name => <<"client.connect">>},
|
||||
#{name => <<"client.connack">>},
|
||||
#{name => <<"client.connected">>},
|
||||
#{name => <<"client.disconnected">>},
|
||||
#{name => <<"client.authenticate">>},
|
||||
#{name => <<"client.check_acl">>},
|
||||
#{name => <<"client.subscribe">>},
|
||||
#{name => <<"client.unsubscribe">>},
|
||||
#{name => <<"session.created">>},
|
||||
#{name => <<"session.subscribed">>},
|
||||
#{name => <<"session.unsubscribed">>},
|
||||
#{name => <<"session.resumed">>},
|
||||
#{name => <<"session.discarded">>},
|
||||
#{name => <<"session.takeovered">>},
|
||||
#{name => <<"session.terminated">>},
|
||||
#{name => <<"message.publish">>},
|
||||
#{name => <<"message.delivered">>},
|
||||
#{name => <<"message.acked">>},
|
||||
#{name => <<"message.dropped">>}]}, Md}.
|
||||
HooksClient =
|
||||
[#{name => <<"client.connect">>},
|
||||
#{name => <<"client.connack">>},
|
||||
#{name => <<"client.connected">>},
|
||||
#{name => <<"client.disconnected">>},
|
||||
#{name => <<"client.authenticate">>},
|
||||
#{name => <<"client.check_acl">>},
|
||||
#{name => <<"client.subscribe">>},
|
||||
#{name => <<"client.unsubscribe">>}],
|
||||
HooksSession =
|
||||
[#{name => <<"session.created">>},
|
||||
#{name => <<"session.subscribed">>},
|
||||
#{name => <<"session.unsubscribed">>},
|
||||
#{name => <<"session.resumed">>},
|
||||
#{name => <<"session.discarded">>},
|
||||
#{name => <<"session.takeovered">>},
|
||||
#{name => <<"session.terminated">>}],
|
||||
HooksMessage =
|
||||
[#{name => <<"message.publish">>},
|
||||
#{name => <<"message.delivered">>},
|
||||
#{name => <<"message.acked">>},
|
||||
#{name => <<"message.dropped">>}],
|
||||
case Name of
|
||||
?DEFAULT_CLUSTER_NAME ->
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md};
|
||||
?OTHER_CLUSTER_NAME_BIN ->
|
||||
{ok, #{hooks => HooksClient}, Md}
|
||||
end.
|
||||
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
||||
| {error, grpc_cowboy_h:error_response()}.
|
||||
|
|
|
@ -36,42 +36,46 @@
|
|||
fun() -> do_teardown(State) end
|
||||
end, ?FORALL(Vars, Types, Exprs))).
|
||||
|
||||
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Properties
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
prop_client_connect() ->
|
||||
?ALL({ConnInfo, ConnProps},
|
||||
{conninfo(), conn_properties()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
|
||||
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{props => properties(ConnProps),
|
||||
conninfo => from_conninfo(ConnInfo)
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
?ALL({ConnInfo, ConnProps, Meta},
|
||||
{conninfo(), conn_properties(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
|
||||
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{props => properties(ConnProps),
|
||||
conninfo => from_conninfo(ConnInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_connack() ->
|
||||
?ALL({ConnInfo, Rc, AckProps},
|
||||
{conninfo(), connack_return_code(), ack_properties()},
|
||||
?ALL({ConnInfo, Rc, AckProps, Meta},
|
||||
{conninfo(), connack_return_code(), ack_properties(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
|
||||
{'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{props => properties(AckProps),
|
||||
result_code => atom_to_binary(Rc, utf8),
|
||||
conninfo => from_conninfo(ConnInfo)
|
||||
conninfo => from_conninfo(ConnInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_authenticate() ->
|
||||
?ALL({ClientInfo0, AuthResult},
|
||||
{clientinfo(), authresult()},
|
||||
?ALL({ClientInfo0, AuthResult, Meta},
|
||||
{clientinfo(), authresult(), request_meta()},
|
||||
begin
|
||||
ClientInfo = inject_magic_into(username, ClientInfo0),
|
||||
OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
|
||||
|
@ -103,16 +107,16 @@ prop_client_authenticate() ->
|
|||
{'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{result => authresult_to_bool(AuthResult),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_check_acl() ->
|
||||
?ALL({ClientInfo0, PubSub, Topic, Result},
|
||||
{clientinfo(), oneof([publish, subscribe]),
|
||||
topic(), oneof([allow, deny])},
|
||||
?ALL({ClientInfo0, PubSub, Topic, Result, Meta},
|
||||
{clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()},
|
||||
begin
|
||||
ClientInfo = inject_magic_into(username, ClientInfo0),
|
||||
OutResult = emqx_hooks:run_fold(
|
||||
|
@ -132,162 +136,179 @@ prop_client_check_acl() ->
|
|||
#{result => aclresult_to_bool(Result),
|
||||
type => pubsub_to_enum(PubSub),
|
||||
topic => Topic,
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_connected() ->
|
||||
?ALL({ClientInfo, ConnInfo},
|
||||
{clientinfo(), conninfo()},
|
||||
?ALL({ClientInfo, ConnInfo, Meta},
|
||||
{clientinfo(), conninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
|
||||
{'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo)
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_disconnected() ->
|
||||
?ALL({ClientInfo, Reason, ConnInfo},
|
||||
{clientinfo(), shutdown_reason(), conninfo()},
|
||||
?ALL({ClientInfo, Reason, ConnInfo, Meta},
|
||||
{clientinfo(), shutdown_reason(), conninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
|
||||
{'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{reason => stringfy(Reason),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_subscribe() ->
|
||||
?ALL({ClientInfo, SubProps, TopicTab},
|
||||
{clientinfo(), sub_properties(), topictab()},
|
||||
?ALL({ClientInfo, SubProps, TopicTab, Meta},
|
||||
{clientinfo(), sub_properties(), topictab(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
|
||||
{'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{props => properties(SubProps),
|
||||
topic_filters => topicfilters(TopicTab),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_client_unsubscribe() ->
|
||||
?ALL({ClientInfo, UnSubProps, TopicTab},
|
||||
{clientinfo(), unsub_properties(), topictab()},
|
||||
?ALL({ClientInfo, UnSubProps, TopicTab, Meta},
|
||||
{clientinfo(), unsub_properties(), topictab(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
|
||||
{'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{props => properties(UnSubProps),
|
||||
topic_filters => topicfilters(TopicTab),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_created() ->
|
||||
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
|
||||
?ALL({ClientInfo, SessInfo, Meta},
|
||||
{clientinfo(), sessioninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
|
||||
{'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo)
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_subscribed() ->
|
||||
?ALL({ClientInfo, Topic, SubOpts},
|
||||
{clientinfo(), topic(), subopts()},
|
||||
?ALL({ClientInfo, Topic, SubOpts, Meta},
|
||||
{clientinfo(), topic(), subopts(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
|
||||
{'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{topic => Topic,
|
||||
subopts => subopts(SubOpts),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_unsubscribed() ->
|
||||
?ALL({ClientInfo, Topic, SubOpts},
|
||||
{clientinfo(), topic(), subopts()},
|
||||
?ALL({ClientInfo, Topic, SubOpts, Meta},
|
||||
{clientinfo(), topic(), subopts(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
|
||||
{'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{topic => Topic,
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_resumed() ->
|
||||
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
|
||||
?ALL({ClientInfo, SessInfo, Meta},
|
||||
{clientinfo(), sessioninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
|
||||
{'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo)
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_discared() ->
|
||||
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
|
||||
?ALL({ClientInfo, SessInfo, Meta},
|
||||
{clientinfo(), sessioninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
|
||||
{'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo)
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_takeovered() ->
|
||||
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
|
||||
?ALL({ClientInfo, SessInfo, Meta},
|
||||
{clientinfo(), sessioninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
|
||||
{'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo)
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_session_terminated() ->
|
||||
?ALL({ClientInfo, Reason, SessInfo},
|
||||
{clientinfo(), shutdown_reason(), sessioninfo()},
|
||||
?ALL({ClientInfo, Reason, SessInfo, Meta},
|
||||
{clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
|
||||
{'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{reason => stringfy(Reason),
|
||||
clientinfo => from_clientinfo(ClientInfo)
|
||||
clientinfo => from_clientinfo(ClientInfo),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp),
|
||||
true
|
||||
end).
|
||||
|
||||
prop_message_publish() ->
|
||||
?ALL(Msg0, message(),
|
||||
?ALL({Msg0, Meta},
|
||||
{message(), request_meta()},
|
||||
begin
|
||||
Msg = emqx_message:from_map(
|
||||
inject_magic_into(from, emqx_message:to_map(Msg0))),
|
||||
|
@ -322,7 +343,8 @@ prop_message_publish() ->
|
|||
|
||||
{'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{message => from_message(Msg)
|
||||
#{message => from_message(Msg),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp)
|
||||
end,
|
||||
|
@ -330,7 +352,8 @@ prop_message_publish() ->
|
|||
end).
|
||||
|
||||
prop_message_dropped() ->
|
||||
?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()},
|
||||
?ALL({Msg, By, Reason, Meta},
|
||||
{message(), hardcoded, shutdown_reason(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
|
||||
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
|
||||
|
@ -339,7 +362,8 @@ prop_message_dropped() ->
|
|||
{'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{reason => stringfy(Reason),
|
||||
message => from_message(Msg)
|
||||
message => from_message(Msg),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp)
|
||||
end,
|
||||
|
@ -347,7 +371,8 @@ prop_message_dropped() ->
|
|||
end).
|
||||
|
||||
prop_message_delivered() ->
|
||||
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
|
||||
?ALL({ClientInfo, Msg, Meta},
|
||||
{clientinfo(), message(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
|
||||
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
|
||||
|
@ -356,7 +381,8 @@ prop_message_delivered() ->
|
|||
{'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
message => from_message(Msg)
|
||||
message => from_message(Msg),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp)
|
||||
end,
|
||||
|
@ -364,7 +390,8 @@ prop_message_delivered() ->
|
|||
end).
|
||||
|
||||
prop_message_acked() ->
|
||||
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
|
||||
?ALL({ClientInfo, Msg, Meta},
|
||||
{clientinfo(), message(), request_meta()},
|
||||
begin
|
||||
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
||||
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
|
||||
|
@ -373,7 +400,8 @@ prop_message_acked() ->
|
|||
{'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
|
||||
Expected =
|
||||
#{clientinfo => from_clientinfo(ClientInfo),
|
||||
message => from_message(Msg)
|
||||
message => from_message(Msg),
|
||||
meta => Meta
|
||||
},
|
||||
?assertEqual(Expected, Resp)
|
||||
end,
|
||||
|
@ -416,6 +444,8 @@ stringfy(Term) when is_integer(Term) ->
|
|||
integer_to_binary(Term);
|
||||
stringfy(Term) when is_atom(Term) ->
|
||||
atom_to_binary(Term, utf8);
|
||||
stringfy(Term) when is_list(Term) ->
|
||||
list_to_binary(Term);
|
||||
stringfy(Term) ->
|
||||
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
|
||||
|
||||
|
@ -520,6 +550,13 @@ sub_properties() ->
|
|||
unsub_properties() ->
|
||||
#{}.
|
||||
|
||||
request_meta() ->
|
||||
#{ node => nodestr()
|
||||
, version => stringfy(emqx_sys:version())
|
||||
, sysdescr => stringfy(emqx_sys:sysdescr())
|
||||
, cluster_name => ?DEFAULT_CLUSTER_NAME
|
||||
}.
|
||||
|
||||
shutdown_reason() ->
|
||||
oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]).
|
||||
|
||||
|
|
|
@ -30,10 +30,10 @@ init_per_suite(Config) ->
|
|||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
emqx_retainer:clean(<<"#">>).
|
||||
|
||||
t_cmd(_) ->
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.2",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
|
@ -11,7 +12,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
|
@ -21,7 +23,8 @@
|
|||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
@ -32,7 +35,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.2",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
|
@ -41,7 +45,8 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||
|
@ -51,7 +56,8 @@
|
|||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -297,6 +297,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip
|
|||
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
|
||||
case test_resource(#{type => Type, config => NewConfig}) of
|
||||
ok ->
|
||||
delete_resource(Id),
|
||||
_ = ?CLUSTER_CALL(init_resource, [Module, Create, Id, Config]),
|
||||
emqx_rule_registry:add_resource(#resource{
|
||||
id = Id,
|
||||
|
|
|
@ -323,13 +323,13 @@ show_resource(#{id := Id}, _Params) ->
|
|||
case emqx_rule_registry:find_resource(Id) of
|
||||
{ok, R} ->
|
||||
Status =
|
||||
[begin
|
||||
St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
|
||||
{ok, St0} -> St0;
|
||||
{error, _} -> #{is_alive => false}
|
||||
end,
|
||||
maps:put(node, Node, St)
|
||||
end || Node <- ekka_mnesia:running_nodes()],
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
{ok, St} -> [maps:put(node, Node, St)];
|
||||
{error, _} -> [maps:put(node, Node, #{is_alive => false})]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]),
|
||||
return({ok, maps:put(status, Status, record_to_map(R))});
|
||||
not_found ->
|
||||
return({error, 404, <<"Not Found">>})
|
||||
|
@ -575,9 +575,17 @@ sort_by(Pos, TplList) ->
|
|||
end, TplList).
|
||||
|
||||
get_rule_metrics(Id) ->
|
||||
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|
||||
|| Node <- ekka_mnesia:running_nodes()].
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
Res -> [maps:put(node, Node, Res)]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]).
|
||||
|
||||
get_action_metrics(Id) ->
|
||||
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
|
||||
|| Node <- ekka_mnesia:running_nodes()].
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
Res -> [maps:put(node, Node, Res)]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]).
|
||||
|
|
|
@ -273,10 +273,13 @@ format(#resource{id = Id,
|
|||
config = Config,
|
||||
description = Descr}) ->
|
||||
Status =
|
||||
[begin
|
||||
{ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]),
|
||||
maps:put(node, Node, St)
|
||||
end || Node <- [node()| nodes()]],
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
{ok, St} -> [maps:put(node, Node, St)];
|
||||
{error, _} -> [maps:put(node, Node, #{is_alive => false})]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]),
|
||||
lists:flatten(io_lib:format("resource(id='~s', type='~s', config=~0p, status=~0p, description='~s')~n", [Id, Type, Config, Status, Descr]));
|
||||
|
||||
format(#resource_type{name = Name,
|
||||
|
@ -369,12 +372,20 @@ get_actions() ->
|
|||
emqx_rule_registry:get_actions().
|
||||
|
||||
get_rule_metrics(Id) ->
|
||||
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|
||||
|| Node <- [node()| nodes()]].
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
Res -> [maps:put(node, Node, Res)]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]).
|
||||
|
||||
get_action_metrics(Id) ->
|
||||
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
|
||||
|| Node <- [node()| nodes()]].
|
||||
lists:concat(
|
||||
[ case rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]) of
|
||||
{badrpc, _} -> [];
|
||||
Res -> [maps:put(node, Node, Res)]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]).
|
||||
|
||||
on_failed(continue) -> continue;
|
||||
on_failed(stop) -> stop;
|
||||
|
|
|
@ -2292,14 +2292,27 @@ broker.session_locking_strategy = quorum
|
|||
## Dispatch strategy for shared subscription
|
||||
##
|
||||
## Value: Enum
|
||||
## - hash_clientid
|
||||
## - hash # same as hash_clientid
|
||||
## - hash_topic
|
||||
## - local
|
||||
## - random
|
||||
## - round_robin
|
||||
## - sticky
|
||||
## - hash # same as hash_clientid
|
||||
## - hash_clientid
|
||||
## - hash_topic
|
||||
broker.shared_subscription_strategy = random
|
||||
|
||||
## Per group dispatch strategy for shared subscription
|
||||
##
|
||||
## Value: Enum
|
||||
## - hash_clientid
|
||||
## - hash # same as hash_clientid
|
||||
## - hash_topic
|
||||
## - local
|
||||
## - random
|
||||
## - round_robin
|
||||
## - sticky
|
||||
#broker.sample_group.shared_subscription_strategy = local
|
||||
|
||||
## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages
|
||||
## This should allow messages to be dispatched to a different subscriber in
|
||||
## the group in case the picked (based on shared_subscription_strategy) one # is offline
|
||||
|
|
|
@ -310,8 +310,16 @@ is_lib(Path) ->
|
|||
string:prefix(Path, code:lib_dir()) =:= nomatch.
|
||||
|
||||
setup_node(Node, Apps) ->
|
||||
LoadedPlugins = emqx_ct_helpers:deps_path(
|
||||
emqx,
|
||||
filename:join(["test", "emqx_SUITE_data", "loaded_plugins"])),
|
||||
LoadedModules = emqx_ct_helpers:deps_path(
|
||||
emqx,
|
||||
filename:join(["test", "emqx_SUITE_data", "loaded_modules"])),
|
||||
EnvHandler =
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, plugins_loaded_file, LoadedPlugins),
|
||||
application:set_env(emqx, modules_loaded_file, LoadedModules),
|
||||
application:set_env(emqx, listeners, []),
|
||||
application:set_env(gen_rpc, port_discovery, manual),
|
||||
ok;
|
||||
|
|
|
@ -62,7 +62,7 @@ t_mod_rewrite(_Config) ->
|
|||
timer:sleep(100),
|
||||
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
|
||||
%% Pub Rules
|
||||
{ok, _Props1, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
|
||||
{ok, _, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
|
||||
RecvTopics2 = [begin
|
||||
ok = emqtt:publish(C, Topic, <<"payload">>),
|
||||
{ok, #{topic := RecvTopic}} = receive_publish(100),
|
||||
|
|
|
@ -2401,7 +2401,7 @@ end}.
|
|||
{datatype, {enum, [local,leader,quorum,all]}}
|
||||
]}.
|
||||
|
||||
%% @doc Shared Subscription Dispatch Strategy.
|
||||
%% @doc Default shared Subscription Dispatch Strategy.
|
||||
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
|
||||
{default, round_robin},
|
||||
{datatype,
|
||||
|
@ -2410,11 +2410,37 @@ end}.
|
|||
round_robin, %% round robin alive subscribers one message after another
|
||||
sticky, %% pick a random subscriber and stick to it
|
||||
hash, %% hash client ID to a group member
|
||||
local, %% send to some locally available subscriber
|
||||
hash_clientid,
|
||||
hash_topic
|
||||
]}}
|
||||
]}.
|
||||
|
||||
%% @doc Per group Shared Subscription Dispatch Strategy
|
||||
{mapping, "broker.$name.shared_subscription_strategy", "emqx.shared_subscription_strategy_per_group", [
|
||||
{default, round_robin},
|
||||
{datatype,
|
||||
{enum,
|
||||
[random, %% randomly pick a subscriber
|
||||
round_robin, %% round robin alive subscribers one message after another
|
||||
sticky, %% pick a random subscriber and stick to it
|
||||
hash, %% hash client ID to a group member
|
||||
local, %% send to some locally available subscriber
|
||||
hash_clientid,
|
||||
hash_topic
|
||||
]}}
|
||||
]}.
|
||||
|
||||
{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) ->
|
||||
Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf),
|
||||
Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) ->
|
||||
{true, {Group, Strategy}};
|
||||
(_) ->
|
||||
false
|
||||
end, Conf0),
|
||||
maps:from_list(Groups)
|
||||
end}.
|
||||
|
||||
%% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages
|
||||
{mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled",
|
||||
[ {default, false},
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% This config file is the very basic config to compile emqx
|
||||
%% This allows emqx to be used as a dependency for other applications
|
||||
%% such as emqx module/plugin develpments and tests.
|
||||
|
@ -39,12 +40,12 @@
|
|||
{deps,
|
||||
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.15"}}}
|
||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
|
||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.1"}}}
|
||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.5"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.8"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.9"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}}
|
||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
||||
|
|
|
@ -8,8 +8,8 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
|
|||
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
|
||||
case "${PKG_VSN}" in
|
||||
4.3*)
|
||||
EMQX_CE_DASHBOARD_VERSION='v4.3.6'
|
||||
EMQX_EE_DASHBOARD_VERSION='v4.3.16'
|
||||
EMQX_CE_DASHBOARD_VERSION='v4.3.7'
|
||||
EMQX_EE_DASHBOARD_VERSION='v4.3.18'
|
||||
;;
|
||||
4.4*)
|
||||
# keep the above 4.3 untouched, otherwise conflicts!
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
[{"4.4.2",
|
||||
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -21,8 +23,8 @@
|
|||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||
|
@ -64,6 +66,8 @@
|
|||
[{"4.4.2",
|
||||
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -81,8 +85,8 @@
|
|||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -265,7 +265,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, <<AckFlags:8, ReasonCode:8, R
|
|||
|
||||
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
||||
#{strict_mode := StrictMode, version := Ver}) ->
|
||||
{TopicName, Rest} = parse_topic_name(Bin, StrictMode),
|
||||
{TopicName, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||
{PacketId, Rest1} = case QoS of
|
||||
?QOS_0 -> {undefined, Rest};
|
||||
_ -> parse_packet_id(Rest)
|
||||
|
@ -273,6 +273,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
|||
(PacketId =/= undefined) andalso
|
||||
StrictMode andalso validate_packet_id(PacketId),
|
||||
{Properties, Payload} = parse_properties(Rest1, Ver, StrictMode),
|
||||
ok = ensure_topic_name_valid(StrictMode, TopicName, Properties),
|
||||
Publish = #mqtt_packet_publish{topic_name = TopicName,
|
||||
packet_id = PacketId,
|
||||
properties = Properties
|
||||
|
@ -357,8 +358,9 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
|||
proto_ver = Ver},
|
||||
Bin, StrictMode) ->
|
||||
{Props, Rest} = parse_properties(Bin, Ver, StrictMode),
|
||||
{Topic, Rest1} = parse_topic_name(Rest, StrictMode),
|
||||
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode),
|
||||
{Payload, Rest2} = parse_binary_data(Rest1),
|
||||
ok = ensure_topic_name_valid(StrictMode, Topic, Props),
|
||||
{Packet#mqtt_packet_connect{will_props = Props,
|
||||
will_topic = Topic,
|
||||
will_payload = Payload
|
||||
|
@ -524,13 +526,14 @@ parse_binary_data(Bin)
|
|||
when 2 > byte_size(Bin) ->
|
||||
error(malformed_binary_data_length).
|
||||
|
||||
parse_topic_name(Bin, false) ->
|
||||
parse_utf8_string(Bin, false);
|
||||
parse_topic_name(Bin, true) ->
|
||||
case parse_utf8_string(Bin, true) of
|
||||
{<<>>, _Rest} -> error(empty_topic_name);
|
||||
Result -> Result
|
||||
end.
|
||||
ensure_topic_name_valid(false, _TopicName, _Properties) ->
|
||||
ok;
|
||||
ensure_topic_name_valid(true, TopicName, _Properties) when TopicName =/= <<>> ->
|
||||
ok;
|
||||
ensure_topic_name_valid(true, <<>>, #{'Topic-Alias' := _}) ->
|
||||
ok;
|
||||
ensure_topic_name_valid(true, <<>>, _) ->
|
||||
error(empty_topic_name).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Serialize MQTT Packet
|
||||
|
|
|
@ -47,7 +47,12 @@
|
|||
]).
|
||||
|
||||
%% for testing
|
||||
-export([subscribers/2, ack_enabled/0]).
|
||||
-ifdef(TEST).
|
||||
-export([ subscribers/2
|
||||
, ack_enabled/0
|
||||
, strategy/1
|
||||
]).
|
||||
-endif.
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
|
@ -63,6 +68,7 @@
|
|||
-type strategy() :: random
|
||||
| round_robin
|
||||
| sticky
|
||||
| local
|
||||
| hash %% same as hash_clientid, backward compatible
|
||||
| hash_clientid
|
||||
| hash_topic.
|
||||
|
@ -121,7 +127,7 @@ dispatch(Group, Topic, Delivery) ->
|
|||
|
||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
||||
case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||
false ->
|
||||
{error, no_subscribers};
|
||||
{Type, SubPid} ->
|
||||
|
@ -133,9 +139,15 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec(strategy() -> strategy()).
|
||||
strategy() ->
|
||||
emqx:get_env(shared_subscription_strategy, random).
|
||||
-spec(strategy(emqx_topic:group()) -> strategy()).
|
||||
strategy(Group) ->
|
||||
case emqx:get_env(shared_subscription_strategy_per_group, #{}) of
|
||||
#{Group := Strategy} ->
|
||||
Strategy;
|
||||
_ ->
|
||||
emqx:get_env(shared_subscription_strategy, random)
|
||||
end.
|
||||
|
||||
|
||||
-spec(ack_enabled() -> boolean()).
|
||||
ack_enabled() ->
|
||||
|
@ -267,6 +279,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|||
end.
|
||||
|
||||
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub;
|
||||
pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) ->
|
||||
case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of
|
||||
[_ | _] = LocalSubs ->
|
||||
pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs);
|
||||
[] ->
|
||||
pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs)
|
||||
end;
|
||||
pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
|
||||
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)),
|
||||
lists:nth(Nth, Subs).
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
]).
|
||||
|
||||
-export([ version/0
|
||||
, cluster_name/0
|
||||
, uptime/0
|
||||
, datetime/0
|
||||
, sysdescr/0
|
||||
|
@ -87,6 +88,10 @@ stop() ->
|
|||
-spec(version() -> string()).
|
||||
version() -> emqx_app:get_release().
|
||||
|
||||
%% @doc Get cluster name
|
||||
-spec(cluster_name() -> string()).
|
||||
cluster_name() -> atom_to_list(ekka:cluster_name()).
|
||||
|
||||
%% @doc Get sys description
|
||||
-spec(sysdescr() -> string()).
|
||||
sysdescr() -> emqx_app:get_description().
|
||||
|
|
|
@ -39,7 +39,7 @@ t_trans(_) ->
|
|||
ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end).
|
||||
|
||||
t_lock_unlocak(_) ->
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
|
||||
{true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).
|
||||
{true, _} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _} = emqx_cm_locker:lock(<<"clientid">>),
|
||||
{true, _} = emqx_cm_locker:unlock(<<"clientid">>),
|
||||
{true, _} = emqx_cm_locker:unlock(<<"clientid">>).
|
||||
|
|
|
@ -163,12 +163,15 @@ t_parse_malformed_utf8_string(_) ->
|
|||
?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
|
||||
|
||||
t_parse_empty_topic_name(_) ->
|
||||
Packet = <<48, 4, 0, 0, 0, 1>>,
|
||||
NormalState = emqx_frame:initial_parse_state(#{strict_mode => false}),
|
||||
?assertMatch({_, _}, emqx_frame:parse(Packet, NormalState)),
|
||||
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<>>),
|
||||
?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})),
|
||||
?catch_error(empty_topic_name, parse_serialize(Packet, #{strict_mode => true})).
|
||||
|
||||
StrictState = emqx_frame:initial_parse_state(#{strict_mode => true}),
|
||||
?catch_error(empty_topic_name, emqx_frame:parse(Packet, StrictState)).
|
||||
t_parse_empty_topic_name_with_alias(_) ->
|
||||
Props = #{'Topic-Alias' => 16#AB},
|
||||
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, Props, <<>>),
|
||||
?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})),
|
||||
?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => true})).
|
||||
|
||||
t_parse_frame_proxy_protocol(_) ->
|
||||
BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">>
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
all() -> emqx_ct:all(?SUITE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
net_kernel:start(['master@127.0.0.1', longnames]),
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
@ -109,9 +110,9 @@ t_no_connection_nack(_) ->
|
|||
|
||||
ExpProp = [{properties, #{'Session-Expiry-Interval' => timer:seconds(30)}}],
|
||||
{ok, SubConnPid1} = emqtt:start_link([{clientid, Subscriber1}] ++ ExpProp),
|
||||
{ok, _Props} = emqtt:connect(SubConnPid1),
|
||||
{ok, _} = emqtt:connect(SubConnPid1),
|
||||
{ok, SubConnPid2} = emqtt:start_link([{clientid, Subscriber2}] ++ ExpProp),
|
||||
{ok, _Props} = emqtt:connect(SubConnPid2),
|
||||
{ok, _} = emqtt:connect(SubConnPid2),
|
||||
emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
|
||||
emqtt:subscribe(SubConnPid1, ShareTopic, QoS),
|
||||
|
||||
|
@ -125,7 +126,7 @@ t_no_connection_nack(_) ->
|
|||
emqx:publish(M#message{id = PacketId})
|
||||
end,
|
||||
SendF(1),
|
||||
timer:sleep(200),
|
||||
ct:sleep(200),
|
||||
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
||||
|
||||
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
||||
|
@ -164,19 +165,24 @@ t_no_connection_nack(_) ->
|
|||
ok.
|
||||
|
||||
t_random(_) ->
|
||||
ok = ensure_config(random, true),
|
||||
test_two_messages(random).
|
||||
|
||||
t_round_robin(_) ->
|
||||
ok = ensure_config(round_robin, true),
|
||||
test_two_messages(round_robin).
|
||||
|
||||
t_sticky(_) ->
|
||||
ok = ensure_config(sticky, true),
|
||||
test_two_messages(sticky).
|
||||
|
||||
t_hash(_) ->
|
||||
test_two_messages(hash, false).
|
||||
ok = ensure_config(hash, false),
|
||||
test_two_messages(hash).
|
||||
|
||||
t_hash_clinetid(_) ->
|
||||
test_two_messages(hash_clientid, false).
|
||||
ok = ensure_config(hash_clientid, false),
|
||||
test_two_messages(hash_clientid).
|
||||
|
||||
t_hash_topic(_) ->
|
||||
ok = ensure_config(hash_topic, false),
|
||||
|
@ -242,55 +248,48 @@ t_not_so_sticky(_) ->
|
|||
ok.
|
||||
|
||||
test_two_messages(Strategy) ->
|
||||
test_two_messages(Strategy, _WithAck = true).
|
||||
test_two_messages(Strategy, <<"group1">>).
|
||||
|
||||
test_two_messages(Strategy, WithAck) ->
|
||||
ok = ensure_config(Strategy, WithAck),
|
||||
test_two_messages(Strategy, Group) ->
|
||||
Topic = <<"foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
|
||||
|
||||
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
|
||||
ct:sleep(100),
|
||||
|
||||
emqx:publish(Message1),
|
||||
Me = self(),
|
||||
WaitF = fun(ExpectedPayload) ->
|
||||
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
|
||||
{true, Pid} ->
|
||||
Me ! {subscriber, Pid},
|
||||
true;
|
||||
Other ->
|
||||
Other
|
||||
end
|
||||
end,
|
||||
WaitF(<<"hello1">>),
|
||||
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
|
||||
emqx_broker:publish(Message2),
|
||||
WaitF(<<"hello2">>),
|
||||
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
|
||||
case Strategy of
|
||||
sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||
round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
|
||||
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
|
||||
_ -> ok
|
||||
end,
|
||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||
|
||||
emqx:publish(Message2),
|
||||
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
|
||||
|
||||
emqtt:stop(ConnPid1),
|
||||
emqtt:stop(ConnPid2),
|
||||
|
||||
case Strategy of
|
||||
sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
||||
round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
||||
hash -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
||||
_ -> ok
|
||||
end,
|
||||
ok.
|
||||
|
||||
last_message(ExpectedPayload, Pids) ->
|
||||
receive
|
||||
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
||||
ct:pal("~p ====== ~p", [Pids, Pid]),
|
||||
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
|
||||
{true, Pid}
|
||||
after 100 ->
|
||||
ct:pal("not yet"),
|
||||
<<"not yet?">>
|
||||
end.
|
||||
|
||||
|
@ -314,6 +313,103 @@ t_uncovered_func(_) ->
|
|||
ignored = emqx_shared_sub ! ignored,
|
||||
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
|
||||
|
||||
t_per_group_config(_) ->
|
||||
ok = ensure_group_config(#{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
<<"round_robin_group">> => round_robin,
|
||||
<<"sticky_group">> => sticky
|
||||
}),
|
||||
%% Each test is repeated 4 times because random strategy may technically pass the test
|
||||
%% so we run 8 tests to make random pass in only 1/256 runs
|
||||
|
||||
test_two_messages(sticky, <<"sticky_group">>),
|
||||
test_two_messages(sticky, <<"sticky_group">>),
|
||||
test_two_messages(round_robin, <<"round_robin_group">>),
|
||||
test_two_messages(round_robin, <<"round_robin_group">>),
|
||||
test_two_messages(sticky, <<"sticky_group">>),
|
||||
test_two_messages(sticky, <<"sticky_group">>),
|
||||
test_two_messages(round_robin, <<"round_robin_group">>),
|
||||
test_two_messages(round_robin, <<"round_robin_group">>).
|
||||
|
||||
t_local(_) ->
|
||||
Node = start_slave('local_shared_sub_test', 21884),
|
||||
GroupConfig = #{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
<<"round_robin_group">> => round_robin,
|
||||
<<"sticky_group">> => sticky
|
||||
},
|
||||
ok = ensure_group_config(Node, GroupConfig),
|
||||
ok = ensure_group_config(GroupConfig),
|
||||
|
||||
Topic = <<"local_foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {port, 21884}]),
|
||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/local_group/local_foo/bar">>, 0}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/local_group/local_foo/bar">>, 0}),
|
||||
|
||||
ct:sleep(100),
|
||||
|
||||
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
||||
|
||||
emqx:publish(Message1),
|
||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||
|
||||
rpc:call(Node, emqx, publish, [Message2]),
|
||||
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
|
||||
RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
|
||||
|
||||
emqtt:stop(ConnPid1),
|
||||
emqtt:stop(ConnPid2),
|
||||
stop_slave(Node),
|
||||
|
||||
?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
|
||||
?assertEqual(local, RemoteLocalGroupStrategy),
|
||||
|
||||
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
||||
ok.
|
||||
|
||||
t_local_fallback(_) ->
|
||||
ok = ensure_group_config(#{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
<<"round_robin_group">> => round_robin,
|
||||
<<"sticky_group">> => sticky
|
||||
}),
|
||||
|
||||
Topic = <<"local_foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
Node = start_slave('local_fallback_shared_sub_test', 11885),
|
||||
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}),
|
||||
|
||||
emqx:publish(Message1),
|
||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
||||
|
||||
rpc:call(Node, emqx, publish, [Message2]),
|
||||
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
|
||||
|
||||
emqtt:stop(ConnPid1),
|
||||
stop_slave(Node),
|
||||
|
||||
?assertEqual(UsedSubPid1, UsedSubPid2),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% help functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -326,6 +422,12 @@ ensure_config(Strategy, AckEnabled) ->
|
|||
application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
|
||||
ok.
|
||||
|
||||
ensure_group_config(Node, Group2Strategy) ->
|
||||
rpc:call(Node, application, set_env, [emqx, shared_subscription_strategy_per_group, Group2Strategy]).
|
||||
|
||||
ensure_group_config(Group2Strategy) ->
|
||||
application:set_env(emqx, shared_subscription_strategy_per_group, Group2Strategy).
|
||||
|
||||
subscribed(Group, Topic, Pid) ->
|
||||
lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
|
||||
|
||||
|
@ -343,3 +445,50 @@ recv_msgs(Count, Msgs) ->
|
|||
Msgs
|
||||
end.
|
||||
|
||||
start_slave(Name, Port) ->
|
||||
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
|
||||
[{kill_if_fail, true},
|
||||
{monitor_master, true},
|
||||
{init_timeout, 10000},
|
||||
{startup_timeout, 10000},
|
||||
{erl_flags, ebin_path()}]),
|
||||
|
||||
pong = net_adm:ping(Node),
|
||||
setup_node(Node, Port),
|
||||
Node.
|
||||
|
||||
stop_slave(Node) ->
|
||||
rpc:call(Node, ekka, leave, []),
|
||||
ct_slave:stop(Node).
|
||||
|
||||
host() ->
|
||||
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
||||
|
||||
ebin_path() ->
|
||||
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||
|
||||
is_lib(Path) ->
|
||||
string:prefix(Path, code:lib_dir()) =:= nomatch.
|
||||
|
||||
setup_node(Node, Port) ->
|
||||
EnvHandler =
|
||||
fun(emqx) ->
|
||||
application:set_env(
|
||||
emqx,
|
||||
listeners,
|
||||
[#{listen_on => {{127,0,0,1},Port},
|
||||
name => "internal",
|
||||
opts => [{zone,internal}],
|
||||
proto => tcp}]),
|
||||
application:set_env(gen_rpc, port_discovery, manual),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
end,
|
||||
|
||||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
||||
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]),
|
||||
|
||||
rpc:call(Node, ekka, join, [node()]),
|
||||
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue