From 34571c779d7743ab5f89d5c6694be402c06260a4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 13 Jan 2023 18:13:59 +0300 Subject: [PATCH] feat: make `suboption` table ordering more natural --- apps/emqx/src/emqx_broker.erl | 30 +++++++++---------- .../test/emqx_auto_subscribe_SUITE.erl | 2 +- .../src/emqx_mgmt_api_subscriptions.erl | 20 ++++++------- apps/emqx_management/src/emqx_mgmt_cli.erl | 4 +-- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index ba16895ae..1c31d86c2 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -106,7 +106,7 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubOption: {SubPid, Topic} -> SubOption + %% SubOption: {Topic, SubPid} -> SubOption ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]), %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... @@ -136,7 +136,7 @@ subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_ma SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), _ = emqx_trace:subscribe(Topic, SubId, SubOpts), SubPid = self(), - case ets:member(?SUBOPTION, {SubPid, Topic}) of + case subscribed(SubPid, Topic) of %% New false -> ok = emqx_broker_helper:register_sub(SubPid, SubId), @@ -164,16 +164,16 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) -> case emqx_broker_helper:get_sub_shard(SubPid, Topic) of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), call(pick(Topic), {subscribe, Topic}); I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), call(pick({Topic, I}), {subscribe, Topic, I}) end; %% Shared subscription do_subscribe(Group, Topic, SubPid, SubOpts) -> - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid). %%-------------------------------------------------------------------- @@ -183,7 +183,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) -> -spec unsubscribe(emqx_types:topic()) -> ok. unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), - case ets:lookup(?SUBOPTION, {SubPid, Topic}) of + case ets:lookup(?SUBOPTION, {Topic, SubPid}) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_trace:unsubscribe(Topic, SubOpts), @@ -193,7 +193,7 @@ unsubscribe(Topic) when is_binary(Topic) -> end. do_unsubscribe(Topic, SubPid, SubOpts) -> - true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete(?SUBOPTION, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), do_unsubscribe(Group, Topic, SubPid, SubOpts), @@ -362,10 +362,10 @@ subscribers(Shard = {shard, _Topic, _I}) -> subscriber_down(SubPid) -> lists:foreach( fun(Topic) -> - case lookup_value(?SUBOPTION, {SubPid, Topic}) of + case lookup_value(?SUBOPTION, {Topic, SubPid}) of SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), - true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete(?SUBOPTION, {Topic, SubPid}), case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), @@ -390,7 +390,7 @@ subscriber_down(SubPid) -> [{emqx_types:topic(), emqx_types:subopts()}]. subscriptions(SubPid) when is_pid(SubPid) -> [ - {Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} + {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})} || Topic <- lookup_value(?SUBSCRIPTION, SubPid, []) ]; subscriptions(SubId) -> @@ -403,19 +403,19 @@ subscriptions(SubId) -> -spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]. subscriptions_via_topic(Topic) -> - MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}], + MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}], ets:select(?SUBOPTION, MatchSpec). -spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean(). subscribed(SubPid, Topic) when is_pid(SubPid) -> - ets:member(?SUBOPTION, {SubPid, Topic}); + ets:member(?SUBOPTION, {Topic, SubPid}); subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> SubPid = emqx_broker_helper:lookup_subpid(SubId), - ets:member(?SUBOPTION, {SubPid, Topic}). + ets:member(?SUBOPTION, {Topic, SubPid}). -spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()). get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> - lookup_value(?SUBOPTION, {SubPid, Topic}); + lookup_value(?SUBOPTION, {Topic, SubPid}); get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> case emqx_broker_helper:lookup_subpid(SubId) of SubPid when is_pid(SubPid) -> @@ -430,7 +430,7 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> %% @private set_subopts(SubPid, Topic, NewOpts) -> - Sub = {SubPid, Topic}, + Sub = {Topic, SubPid}, case ets:lookup(?SUBOPTION, Sub) of [{_, OldOpts}] -> ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)}); diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 900f39ebb..5c5a3ee79 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -204,7 +204,7 @@ check_subs(Count) -> check_subs([], []) -> ok; -check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) -> +check_subs([{{Topic, _}, #{subid := ?CLIENT_ID}} | Subs], List) -> check_subs(Subs, lists:delete(Topic, List)); check_subs([_ | Subs], List) -> check_subs(Subs, List). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index b3380f4d1..bf84d03d5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -173,7 +173,7 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(WhichNode, {{_Subscriber, Topic}, Options}) -> +format(WhichNode, {{Topic, _Subscriber}, Options}) -> maps:merge( #{ topic => get_topic(Topic, Options), @@ -205,14 +205,14 @@ gen_match_spec([], MtchHead) -> gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> gen_match_spec(More, update_ms(Key, Value, MtchHead)). -update_ms(clientid, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{subid => X}}; -update_ms(topic, X, {{Pid, _Topic}, Opts}) -> - {{Pid, X}, Opts}; -update_ms(share_group, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{share => X}}; -update_ms(qos, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{qos => X}}. +update_ms(clientid, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{subid => X}}; +update_ms(topic, X, {{_Topic, Pid}, Opts}) -> + {{X, Pid}, Opts}; +update_ms(share_group, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{share => X}}; +update_ms(qos, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{qos => X}}. fuzzy_filter_fun([]) -> undefined; @@ -221,5 +221,5 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; -run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> +run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d19716ce..0e7506a0b 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -213,7 +213,7 @@ subscriptions(["show", ClientId]) -> [] -> emqx_ctl:print("Not Found.~n"); [{_, Pid}] -> - case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of + case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] end @@ -829,7 +829,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_topic, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); -print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> +print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) -> SubId = maps:get(subid, Options), QoS = maps:get(qos, Options, 0), NL = maps:get(nl, Options, 0),