diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6122ff596..209715a85 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -89,7 +89,7 @@ mnesia(boot) -> ok = mria:create_table( ?ACTIVATED_ALARM, [ - {type, set}, + {type, ordered_set}, {storage, disc_copies}, {local_content, true}, {record_name, activated_alarm}, diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 7b8e3dddd..1c31d86c2 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -106,8 +106,8 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubOption: {SubPid, Topic} -> SubOption - ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]), + %% SubOption: {Topic, SubPid} -> SubOption + ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]), %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... %% duplicate_bag: o(1) insert @@ -136,7 +136,7 @@ subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_ma SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), _ = emqx_trace:subscribe(Topic, SubId, SubOpts), SubPid = self(), - case ets:member(?SUBOPTION, {SubPid, Topic}) of + case subscribed(SubPid, Topic) of %% New false -> ok = emqx_broker_helper:register_sub(SubPid, SubId), @@ -164,16 +164,16 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) -> case emqx_broker_helper:get_sub_shard(SubPid, Topic) of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), call(pick(Topic), {subscribe, Topic}); I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), call(pick({Topic, I}), {subscribe, Topic, I}) end; %% Shared subscription do_subscribe(Group, Topic, SubPid, SubOpts) -> - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid). %%-------------------------------------------------------------------- @@ -183,7 +183,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) -> -spec unsubscribe(emqx_types:topic()) -> ok. unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), - case ets:lookup(?SUBOPTION, {SubPid, Topic}) of + case ets:lookup(?SUBOPTION, {Topic, SubPid}) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_trace:unsubscribe(Topic, SubOpts), @@ -193,7 +193,7 @@ unsubscribe(Topic) when is_binary(Topic) -> end. do_unsubscribe(Topic, SubPid, SubOpts) -> - true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete(?SUBOPTION, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), do_unsubscribe(Group, Topic, SubPid, SubOpts), @@ -362,10 +362,10 @@ subscribers(Shard = {shard, _Topic, _I}) -> subscriber_down(SubPid) -> lists:foreach( fun(Topic) -> - case lookup_value(?SUBOPTION, {SubPid, Topic}) of + case lookup_value(?SUBOPTION, {Topic, SubPid}) of SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), - true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete(?SUBOPTION, {Topic, SubPid}), case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), @@ -390,7 +390,7 @@ subscriber_down(SubPid) -> [{emqx_types:topic(), emqx_types:subopts()}]. subscriptions(SubPid) when is_pid(SubPid) -> [ - {Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} + {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})} || Topic <- lookup_value(?SUBSCRIPTION, SubPid, []) ]; subscriptions(SubId) -> @@ -403,19 +403,19 @@ subscriptions(SubId) -> -spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]. subscriptions_via_topic(Topic) -> - MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}], + MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}], ets:select(?SUBOPTION, MatchSpec). -spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean(). subscribed(SubPid, Topic) when is_pid(SubPid) -> - ets:member(?SUBOPTION, {SubPid, Topic}); + ets:member(?SUBOPTION, {Topic, SubPid}); subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> SubPid = emqx_broker_helper:lookup_subpid(SubId), - ets:member(?SUBOPTION, {SubPid, Topic}). + ets:member(?SUBOPTION, {Topic, SubPid}). -spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()). get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> - lookup_value(?SUBOPTION, {SubPid, Topic}); + lookup_value(?SUBOPTION, {Topic, SubPid}); get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> case emqx_broker_helper:lookup_subpid(SubId) of SubPid when is_pid(SubPid) -> @@ -430,7 +430,7 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> %% @private set_subopts(SubPid, Topic, NewOpts) -> - Sub = {SubPid, Topic}, + Sub = {Topic, SubPid}, case ets:lookup(?SUBOPTION, Sub) of [{_, OldOpts}] -> ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)}); diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index ac39e2cda..6ce59d4f9 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -94,6 +94,7 @@ mnesia(boot) -> ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, + {type, ordered_set}, {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 7c51644b7..25a3a5976 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -96,6 +96,7 @@ mnesia(boot) -> ok = mria:create_table(?TAB, [ {rlog_shard, ?AUTH_SHARD}, + {type, ordered_set}, {storage, disc_copies}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 900f39ebb..5c5a3ee79 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -204,7 +204,7 @@ check_subs(Count) -> check_subs([], []) -> ok; -check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) -> +check_subs([{{Topic, _}, #{subid := ?CLIENT_ID}} | Subs], List) -> check_subs(Subs, lists:delete(Topic, List)); check_subs([_ | Subs], List) -> check_subs(Subs, List). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 5cba1464a..4719b1da8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -768,7 +768,7 @@ init(Options) -> {ChanTab, ConnTab, InfoTab} = cmtabs(GwName), ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]), ok = emqx_tables:new(ConnTab, [bag | TabOpts]), - ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]), + ok = emqx_tables:new(InfoTab, [ordered_set, compressed | TabOpts]), %% Start link cm-registry process %% XXX: Should I hang it under a higher level supervisor? diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index b3380f4d1..bf84d03d5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -173,7 +173,7 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. -format(WhichNode, {{_Subscriber, Topic}, Options}) -> +format(WhichNode, {{Topic, _Subscriber}, Options}) -> maps:merge( #{ topic => get_topic(Topic, Options), @@ -205,14 +205,14 @@ gen_match_spec([], MtchHead) -> gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> gen_match_spec(More, update_ms(Key, Value, MtchHead)). -update_ms(clientid, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{subid => X}}; -update_ms(topic, X, {{Pid, _Topic}, Opts}) -> - {{Pid, X}, Opts}; -update_ms(share_group, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{share => X}}; -update_ms(qos, X, {{Pid, Topic}, Opts}) -> - {{Pid, Topic}, Opts#{qos => X}}. +update_ms(clientid, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{subid => X}}; +update_ms(topic, X, {{_Topic, Pid}, Opts}) -> + {{X, Pid}, Opts}; +update_ms(share_group, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{share => X}}; +update_ms(qos, X, {{Topic, Pid}, Opts}) -> + {{Topic, Pid}, Opts#{qos => X}}. fuzzy_filter_fun([]) -> undefined; @@ -221,5 +221,5 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; -run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> +run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d19716ce..0e7506a0b 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -213,7 +213,7 @@ subscriptions(["show", ClientId]) -> [] -> emqx_ctl:print("Not Found.~n"); [{_, Pid}] -> - case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of + case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] end @@ -829,7 +829,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_topic, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); -print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> +print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) -> SubId = maps:get(subid, Options), QoS = maps:get(qos, Options, 0), NL = maps:get(nl, Options, 0), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index d7ea6c6d1..14d2b1f95 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -25,7 +25,7 @@ -export([stop/1]). start(_Type, _Args) -> - _ = ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]), + _ = ets:new(?RULE_TAB, [named_table, public, ordered_set, {read_concurrency, true}]), ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(),