From 0522361604cdcb85d20871bdef99bcd937ad9bcb Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 12 Apr 2022 22:59:04 +0400 Subject: [PATCH] feat(shared_sub): Per-group shared subscription and local strategy --- apps/emqx/etc/emqx.conf | 22 +- apps/emqx/i18n/emqx_schema_i18n.conf | 37 ++++ apps/emqx/src/emqx_config.erl | 92 ++++++--- apps/emqx/src/emqx_map_lib.erl | 21 ++ apps/emqx/src/emqx_schema.erl | 19 +- apps/emqx/src/emqx_shared_sub.erl | 26 ++- apps/emqx/test/emqx_shared_sub_SUITE.erl | 247 +++++++++++++++++++---- apps/emqx_machine/src/emqx_machine.erl | 6 +- 8 files changed, 394 insertions(+), 76 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 8f54ae41f..013150cce 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1023,15 +1023,35 @@ broker { ## Dispatch strategy for shared subscription ## ## @doc broker.shared_subscription_strategy - ## ValueType: random | round_robin | sticky | hash + ## ValueType: random | round_robin | sticky | hash | local ## - random: dispatch the message to a random selected subscriber ## - round_robin: select the subscribers in a round-robin manner + ## - local: select random local subscriber otherwise select random cluster-wide ## - sticky: always use the last selected subscriber to dispatch, ## until the subscriber disconnects. ## - hash: select the subscribers by the hash of clientIds ## Default: round_robin shared_subscription_strategy = round_robin + ## Per-group dispatch strategy for shared subscription + ## + ## @doc broker.shared_subscription_group.$group_name.strategy + ## ValueType: random | round_robin | sticky | hash | local + ## - random: dispatch the message to a random selected subscriber + ## - round_robin: select the subscribers in a round-robin manner + ## - local: select the local subscriber otherwise random cluster-wide + ## - sticky: always use the last selected subscriber to dispatch, + ## until the subscriber disconnects. + ## - hash: select the subscribers by the hash of clientIds + ## Default: round_robin + shared_subscription_group { + + ## example_group { + ## strategy = random + ## } + + } + ## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages ## This should allow messages to be dispatched to a different subscriber in ## the group in case the picked (based on shared_subscription_strategy) one # is offline diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 6e4b9fda7..6cf13eae6 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -1085,6 +1085,43 @@ This should allow messages to be dispatched to a different subscriber in the gro } } + shared_subscription_group_strategy { + desc { + en: """Per group dispatch strategy for shared subscription. +This config is a map from shared subscription group name to the strategy +name. The group name should be of format `[A-Za-z0-9]`. i.e. no +special characters are allowed. +""" + cn: """设置共享订阅组为单位的分发策略。该配置是一个从组名到 +策略名的一个map,组名不得包含 `[A-Za-z0-9]` 之外的特殊字符。 +""" + } + + } + + shared_subscription_strategy_enum { + desc { + en: """Dispatch strategy for shared subscription. +- `random`: dispatch the message to a random selected subscriber +- `round_robin`: select the subscribers in a round-robin manner +- `sticky`: always use the last selected subscriber to dispatch, +until the subscriber disconnects. +- `hash`: select the subscribers by the hash of `clientIds` +- `local`: send to a random local subscriber. If local +subscriber was not found, send to a random subscriber cluster-wide +""" + cn: """共享订阅的分发策略名称。 +- `random`: 随机选择一个组内成员; +- `round_robin`: 循环选择下一个成员; +- `sticky`: 使用上一次选中的成员; +- `hash`: 根据 ClientID 哈希映射到一个成员; +- `local`: 随机分发到节点本地成成员,如果本地成员不存在,则随机分发 +到任意一个成员。 +""" + + } + } + broker_perf_route_lock_type { desc { en: """Performance tuning for subscribing/unsubscribing a wildcard topic. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 22b9eb05e..8cab6a99f 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -47,6 +47,8 @@ find_raw/1, put/1, put/2, + force_put/2, + force_put/3, erase/1 ]). @@ -91,14 +93,6 @@ -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]). --define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL), - try [atom(Key) || Key <- PATH] of - AtomKeyPath -> EXP - catch - error:badarg -> EXP_ON_FAIL - end -). - -export_type([ update_request/0, raw_config/0, @@ -166,10 +160,10 @@ find([]) -> Res -> {ok, Res} end; find(KeyPath) -> - ?ATOM_CONF_PATH( + atom_conf_path( KeyPath, - emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)), - {not_found, KeyPath} + fun(AtomKeyPath) -> emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)) end, + {return, {not_found, KeyPath}} ). -spec find_raw(emqx_map_lib:config_key_path()) -> @@ -239,7 +233,29 @@ erase(RootName) -> persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))). -spec put(emqx_map_lib:config_key_path(), term()) -> ok. -put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config). +put(KeyPath, Config) -> + Putter = fun(Path, Map, Value) -> + emqx_map_lib:deep_put(Path, Map, Value) + end, + do_put(?CONF, Putter, KeyPath, Config). + +%% Puts value into configuration even if path doesn't exist +%% For paths of non-existing atoms use force_put(KeyPath, Config, unsafe) +-spec force_put(emqx_map_lib:config_key_path(), term()) -> ok. +force_put(KeyPath, Config) -> + force_put(KeyPath, Config, safe). + +-spec force_put(emqx_map_lib:config_key_path(), term(), safe | unsafe) -> ok. +force_put(KeyPath0, Config, Safety) -> + KeyPath = + case Safety of + safe -> KeyPath0; + unsafe -> [unsafe_atom(Key) || Key <- KeyPath0] + end, + Putter = fun(Path, Map, Value) -> + emqx_map_lib:deep_force_put(Path, Map, Value) + end, + do_put(?CONF, Putter, KeyPath, Config). -spec get_default_value(emqx_map_lib:config_key_path()) -> {ok, term()} | {error, term()}. get_default_value([RootName | _] = KeyPath) -> @@ -277,7 +293,11 @@ put_raw(Config) -> ). -spec put_raw(emqx_map_lib:config_key_path(), term()) -> ok. -put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config). +put_raw(KeyPath, Config) -> + Putter = fun(Path, Map, Value) -> + emqx_map_lib:deep_force_put(Path, Map, Value) + end, + do_put(?RAW_CONF, Putter, KeyPath, Config). %%============================================================================ %% Load/Update configs From/To files @@ -541,41 +561,48 @@ do_get(Type, [RootName | KeyPath], Default) -> RootV = persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), #{}), do_deep_get(Type, KeyPath, RootV, Default). -do_put(Type, [], DeepValue) -> +do_put(Type, Putter, [], DeepValue) -> maps:fold( fun(RootName, Value, _Res) -> - do_put(Type, [RootName], Value) + do_put(Type, Putter, [RootName], Value) end, ok, DeepValue ); -do_put(Type, [RootName | KeyPath], DeepValue) -> +do_put(Type, Putter, [RootName | KeyPath], DeepValue) -> OldValue = do_get(Type, [RootName], #{}), - NewValue = do_deep_put(Type, KeyPath, OldValue, DeepValue), + NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue), persistent_term:put(?PERSIS_KEY(Type, bin(RootName)), NewValue). do_deep_get(?CONF, KeyPath, Map, Default) -> - ?ATOM_CONF_PATH( + atom_conf_path( KeyPath, - emqx_map_lib:deep_get(AtomKeyPath, Map, Default), - Default + fun(AtomKeyPath) -> emqx_map_lib:deep_get(AtomKeyPath, Map, Default) end, + {return, Default} ); do_deep_get(?RAW_CONF, KeyPath, Map, Default) -> emqx_map_lib:deep_get([bin(Key) || Key <- KeyPath], Map, Default). -do_deep_put(?CONF, KeyPath, Map, Value) -> - ?ATOM_CONF_PATH( +do_deep_put(?CONF, Putter, KeyPath, Map, Value) -> + atom_conf_path( KeyPath, - emqx_map_lib:deep_put(AtomKeyPath, Map, Value), - error({not_found, KeyPath}) + fun(AtomKeyPath) -> Putter(AtomKeyPath, Map, Value) end, + {raise_error, {not_found, KeyPath}} ); -do_deep_put(?RAW_CONF, KeyPath, Map, Value) -> - emqx_map_lib:deep_put([bin(Key) || Key <- KeyPath], Map, Value). +do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) -> + Putter([bin(Key) || Key <- KeyPath], Map, Value). root_names_from_conf(RawConf) -> Keys = maps:keys(RawConf), [Name || Name <- get_root_names(), lists:member(Name, Keys)]. +unsafe_atom(Bin) when is_binary(Bin) -> + binary_to_atom(Bin, utf8); +unsafe_atom(Str) when is_list(Str) -> + list_to_atom(Str); +unsafe_atom(Atom) when is_atom(Atom) -> + Atom. + atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8); atom(Str) when is_list(Str) -> @@ -591,3 +618,16 @@ conf_key(?CONF, RootName) -> atom(RootName); conf_key(?RAW_CONF, RootName) -> bin(RootName). + +atom_conf_path(Path, ExpFun, OnFail) -> + try [atom(Key) || Key <- Path] of + AtomKeyPath -> ExpFun(AtomKeyPath) + catch + error:badarg -> + case OnFail of + {return, Val} -> + Val; + {raise_error, Err} -> + error(Err) + end + end. diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index 650ef0401..52a96fea7 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -20,6 +20,7 @@ deep_get/3, deep_find/2, deep_put/3, + deep_force_put/3, deep_remove/2, deep_merge/2, safe_atom_key_map/1, @@ -73,6 +74,26 @@ deep_put([Key | KeyPath], Map, Data) -> SubMap = maps:get(Key, Map, #{}), Map#{Key => deep_put(KeyPath, SubMap, Data)}. +%% Like deep_put, but ensures that the key path is present. +%% If key path is not present in map, creates the keys, until it's present +%% deep_force_put([x, y, z], #{a => 1}, 0) -> #{a => 1, x => #{y => #{z => 0}}} +-spec deep_force_put(config_key_path(), map(), term()) -> map(). +deep_force_put([], _Map, Data) -> + Data; +deep_force_put([Key | KeyPath] = FullPath, Map, Data) -> + case Map of + #{Key := InnerValue} -> + Map#{Key => deep_force_put(KeyPath, InnerValue, Data)}; + #{} -> + maps:put(Key, path_to_map(KeyPath, Data), Map); + _ -> + path_to_map(FullPath, Data) + end. + +-spec path_to_map(config_key_path(), term()) -> map(). +path_to_map([], Data) -> Data; +path_to_map([Key | Tail], Data) -> #{Key => path_to_map(Tail, Data)}. + -spec deep_remove(config_key_path(), map()) -> map(). deep_remove([], Map) -> Map; diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 039808c4b..ee59b039d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1137,7 +1137,7 @@ fields("broker") -> )}, {"shared_subscription_strategy", sc( - hoconsc:enum([random, round_robin, sticky, hash_topic, hash_clientid]), + hoconsc:enum([random, round_robin, sticky, local, hash_topic, hash_clientid]), #{ default => round_robin, desc => ?DESC(broker_shared_subscription_strategy) @@ -1163,6 +1163,21 @@ fields("broker") -> sc( ref("broker_perf"), #{} + )}, + {"shared_subscription_group", + sc( + map(name, ref("shared_subscription_group")), + #{desc => ?DESC(shared_subscription_group_strategy)} + )} + ]; +fields("shared_subscription_group") -> + [ + {"strategy", + sc( + hoconsc:enum([random, round_robin, sticky, local, hash_topic, hash_clientid]), + #{ + desc => ?DESC(shared_subscription_strategy_enum) + } )} ]; fields("broker_perf") -> @@ -1712,6 +1727,8 @@ desc("alarm") -> "Settings for the alarms."; desc("trace") -> "Real-time filtering logs for the ClientID or Topic or IP for debugging."; +desc("shared_subscription_group") -> + "Per group dispatch strategy for shared subscription"; desc(_) -> undefined. diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index e8ae58f79..ee45b991d 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -46,7 +46,12 @@ ]). %% for testing --export([subscribers/2]). +-ifdef(TEST). +-export([ + subscribers/2, + strategy/1 +]). +-endif. %% gen_server callbacks -export([ @@ -64,6 +69,7 @@ random | round_robin | sticky + | local %% same as hash_clientid, backward compatible | hash | hash_clientid @@ -122,7 +128,7 @@ dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of + case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> @@ -135,9 +141,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec strategy() -> strategy(). -strategy() -> - emqx:get_config([broker, shared_subscription_strategy]). +-spec strategy(emqx_topic:group()) -> strategy(). +strategy(Group) -> + case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of + undefined -> emqx:get_config([broker, shared_subscription_strategy]); + Strategy -> Strategy + end. -spec ack_enabled() -> boolean(). ack_enabled() -> @@ -270,6 +279,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub; +pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) -> + case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of + [_ | _] = LocalSubs -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs); + [] -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs) + end; pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)), lists:nth(Nth, Subs). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 6b171206a..19aa96942 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -37,6 +37,7 @@ all() -> emqx_common_test_helpers:all(?SUITE). init_per_suite(Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), Config. @@ -187,19 +188,24 @@ t_no_connection_nack(_) -> ok. t_random(_) -> + ok = ensure_config(random, true), test_two_messages(random). t_round_robin(_) -> + ok = ensure_config(round_robin, true), test_two_messages(round_robin). t_sticky(_) -> + ok = ensure_config(sticky, true), test_two_messages(sticky). t_hash(_) -> - test_two_messages(hash, false). + ok = ensure_config(hash, false), + test_two_messages(hash). t_hash_clinetid(_) -> - test_two_messages(hash_clientid, false). + ok = ensure_config(hash_clientid, false), + test_two_messages(hash_clientid). t_hash_topic(_) -> ok = ensure_config(hash_topic, false), @@ -271,53 +277,39 @@ t_not_so_sticky(_) -> ok. test_two_messages(Strategy) -> - test_two_messages(Strategy, _WithAck = true). + test_two_messages(Strategy, <<"group1">>). -test_two_messages(Strategy, WithAck) -> - ok = ensure_config(Strategy, WithAck), +test_two_messages(Strategy, Group) -> Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), - {ok, _} = emqtt:connect(ConnPid1), {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>), - emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}), - emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}), ct:sleep(100), + emqx:publish(Message1), - Me = self(), - WaitF = fun(ExpectedPayload) -> - case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of - {true, Pid} -> - Me ! {subscriber, Pid}, - true; - Other -> - Other - end - end, - WaitF(<<"hello1">>), - UsedSubPid1 = - receive - {subscriber, P1} -> P1 - end, - emqx_broker:publish(Message2), - WaitF(<<"hello2">>), - UsedSubPid2 = - receive - {subscriber, P2} -> P2 - end, - case Strategy of - sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); - round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); - hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); - _ -> ok - end, + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + emqx:publish(Message2), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + emqtt:stop(ConnPid1), emqtt:stop(ConnPid2), + + case Strategy of + sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2); + round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2); + hash -> ?assertEqual(UsedSubPid1, UsedSubPid2); + _ -> ok + end, ok. last_message(ExpectedPayload, Pids) -> @@ -325,7 +317,7 @@ last_message(ExpectedPayload, Pids) -> {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("~p ====== ~p", [Pids, Pid]), {true, Pid} - after 100 -> + after 500 -> <<"not yet?">> end. @@ -353,6 +345,101 @@ t_uncovered_func(_) -> ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. +t_per_group_config(_) -> + ok = ensure_group_config(#{ + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + %% Each test is repeated 4 times because random strategy may technically pass the test + %% so we run 8 tests to make random pass in only 1/256 runs + + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>). + +t_local(_) -> + GroupConfig = #{ + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }, + + Node = start_slave('local_shared_sub_testtesttest', 21999), + ok = ensure_group_config(GroupConfig), + ok = ensure_group_config(Node, GroupConfig), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {port, 21999}]), + + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/local_group/", Topic/binary>>, 0}), + + ct:sleep(100), + + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]), + + emqtt:stop(ConnPid1), + emqtt:stop(ConnPid2), + stop_slave(Node), + + ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)), + ?assertEqual(local, RemoteLocalGroupStrategy), + + ?assertNotEqual(UsedSubPid1, UsedSubPid2), + ok. + +t_local_fallback(_) -> + ok = ensure_group_config(#{ + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + Node = start_slave('local_fallback_shared_sub_test', 11888), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, _} = emqtt:connect(ConnPid1), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]), + + emqtt:stop(ConnPid1), + stop_slave(Node), + + ?assertEqual(UsedSubPid1, UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -365,6 +452,29 @@ ensure_config(Strategy, AckEnabled) -> emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled), ok. +ensure_group_config(Group2Strategy) -> + lists:foreach( + fun({Group, Strategy}) -> + emqx_config:force_put( + [broker, shared_subscription_group, Group, strategy], Strategy, unsafe + ) + end, + maps:to_list(Group2Strategy) + ). + +ensure_group_config(Node, Group2Strategy) -> + lists:foreach( + fun({Group, Strategy}) -> + rpc:call( + Node, + emqx_config, + force_put, + [[broker, shared_subscription_group, Group, strategy], Strategy, unsafe] + ) + end, + maps:to_list(Group2Strategy) + ). + subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). @@ -376,10 +486,67 @@ recv_msgs(0, Msgs) -> recv_msgs(Count, Msgs) -> receive {publish, Msg} -> - recv_msgs(Count - 1, [Msg | Msgs]); - %%TODO:: remove the branch? - _Other -> - recv_msgs(Count, Msgs) + recv_msgs(Count - 1, [Msg | Msgs]) after 100 -> Msgs end. + +start_slave(Name, Port) -> + {ok, Node} = ct_slave:start( + list_to_atom(atom_to_list(Name) ++ "@" ++ host()), + [ + {kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()} + ] + ), + + pong = net_adm:ping(Node), + setup_node(Node, Port), + Node. + +stop_slave(Node) -> + rpc:call(Node, mria, leave, []), + ct_slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch. + +setup_node(Node, Port) -> + EnvHandler = + fun(_) -> + %% We load configuration, and than set the special enviroment variable + %% which says that emqx shouldn't load configuration at startup + emqx_config:init_load(emqx_schema), + application:set_env(emqx, init_config_load_done, true), + + ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}), + ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}), + ok = emqx_config:put([listeners, quic, default, bind], {{127, 0, 0, 1}, Port + 2}), + ok = emqx_config:put([listeners, ws, default, bind], {{127, 0, 0, 1}, Port + 3}), + ok = emqx_config:put([listeners, wss, default, bind], {{127, 0, 0, 1}, Port + 4}), + ok + end, + + %% Load env before doing anything + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx, ekka, mria]], + + %% Needs to be set explicitly because ekka:start() (which calls `gen` is called without Handler + %% in emqx_common_test_helpers:start_apps(...) + ok = rpc:call(Node, application, set_env, [gen_rpc, tcp_server_port, Port - 1]), + ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual]), + + %% Here we start the node and make it join the cluster + ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [[], EnvHandler]), + rpc:call(Node, mria, join, [node()]), + + ok. diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 2a4595bf9..c2adf7bf5 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -92,10 +92,10 @@ update_vips() -> configure_shard_transports() -> ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}), - maps:foreach( - fun(ShardBin, Transport) -> + lists:foreach( + fun({ShardBin, Transport}) -> ShardName = binary_to_existing_atom(ShardBin), mria_config:set_shard_transport(ShardName, Transport) end, - ShardTransports + maps:to_list(ShardTransports) ).