feat: make `suboption` table ordering more natural

This commit is contained in:
Andrew Mayorov 2023-01-13 18:13:59 +03:00
parent ce2dba15b4
commit 34571c779d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 28 additions and 28 deletions

View File

@ -106,7 +106,7 @@ start_link(Pool, Id) ->
create_tabs() -> create_tabs() ->
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
%% SubOption: {SubPid, Topic} -> SubOption %% SubOption: {Topic, SubPid} -> SubOption
ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]), ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]),
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ... %% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
@ -136,7 +136,7 @@ subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_ma
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
_ = emqx_trace:subscribe(Topic, SubId, SubOpts), _ = emqx_trace:subscribe(Topic, SubId, SubOpts),
SubPid = self(), SubPid = self(),
case ets:member(?SUBOPTION, {SubPid, Topic}) of case subscribed(SubPid, Topic) of
%% New %% New
false -> false ->
ok = emqx_broker_helper:register_sub(SubPid, SubId), ok = emqx_broker_helper:register_sub(SubPid, SubId),
@ -164,16 +164,16 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) ->
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
0 -> 0 ->
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
call(pick(Topic), {subscribe, Topic}); call(pick(Topic), {subscribe, Topic});
I -> I ->
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
call(pick({Topic, I}), {subscribe, Topic, I}) call(pick({Topic, I}), {subscribe, Topic, I})
end; end;
%% Shared subscription %% Shared subscription
do_subscribe(Group, Topic, SubPid, SubOpts) -> do_subscribe(Group, Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
emqx_shared_sub:subscribe(Group, Topic, SubPid). emqx_shared_sub:subscribe(Group, Topic, SubPid).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -183,7 +183,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
-spec unsubscribe(emqx_types:topic()) -> ok. -spec unsubscribe(emqx_types:topic()) -> ok.
unsubscribe(Topic) when is_binary(Topic) -> unsubscribe(Topic) when is_binary(Topic) ->
SubPid = self(), SubPid = self(),
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of case ets:lookup(?SUBOPTION, {Topic, SubPid}) of
[{_, SubOpts}] -> [{_, SubOpts}] ->
_ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_broker_helper:reclaim_seq(Topic),
_ = emqx_trace:unsubscribe(Topic, SubOpts), _ = emqx_trace:unsubscribe(Topic, SubOpts),
@ -193,7 +193,7 @@ unsubscribe(Topic) when is_binary(Topic) ->
end. end.
do_unsubscribe(Topic, SubPid, SubOpts) -> do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete(?SUBOPTION, {Topic, SubPid}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined), Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts), do_unsubscribe(Group, Topic, SubPid, SubOpts),
@ -362,10 +362,10 @@ subscribers(Shard = {shard, _Topic, _I}) ->
subscriber_down(SubPid) -> subscriber_down(SubPid) ->
lists:foreach( lists:foreach(
fun(Topic) -> fun(Topic) ->
case lookup_value(?SUBOPTION, {SubPid, Topic}) of case lookup_value(?SUBOPTION, {Topic, SubPid}) of
SubOpts when is_map(SubOpts) -> SubOpts when is_map(SubOpts) ->
_ = emqx_broker_helper:reclaim_seq(Topic), _ = emqx_broker_helper:reclaim_seq(Topic),
true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete(?SUBOPTION, {Topic, SubPid}),
case maps:get(shard, SubOpts, 0) of case maps:get(shard, SubOpts, 0) of
0 -> 0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
@ -390,7 +390,7 @@ subscriber_down(SubPid) ->
[{emqx_types:topic(), emqx_types:subopts()}]. [{emqx_types:topic(), emqx_types:subopts()}].
subscriptions(SubPid) when is_pid(SubPid) -> subscriptions(SubPid) when is_pid(SubPid) ->
[ [
{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})}
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, []) || Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])
]; ];
subscriptions(SubId) -> subscriptions(SubId) ->
@ -403,19 +403,19 @@ subscriptions(SubId) ->
-spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()]. -spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()].
subscriptions_via_topic(Topic) -> subscriptions_via_topic(Topic) ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}], MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}],
ets:select(?SUBOPTION, MatchSpec). ets:select(?SUBOPTION, MatchSpec).
-spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean(). -spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean().
subscribed(SubPid, Topic) when is_pid(SubPid) -> subscribed(SubPid, Topic) when is_pid(SubPid) ->
ets:member(?SUBOPTION, {SubPid, Topic}); ets:member(?SUBOPTION, {Topic, SubPid});
subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
SubPid = emqx_broker_helper:lookup_subpid(SubId), SubPid = emqx_broker_helper:lookup_subpid(SubId),
ets:member(?SUBOPTION, {SubPid, Topic}). ets:member(?SUBOPTION, {Topic, SubPid}).
-spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()). -spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()).
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
lookup_value(?SUBOPTION, {SubPid, Topic}); lookup_value(?SUBOPTION, {Topic, SubPid});
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
case emqx_broker_helper:lookup_subpid(SubId) of case emqx_broker_helper:lookup_subpid(SubId) of
SubPid when is_pid(SubPid) -> SubPid when is_pid(SubPid) ->
@ -430,7 +430,7 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
%% @private %% @private
set_subopts(SubPid, Topic, NewOpts) -> set_subopts(SubPid, Topic, NewOpts) ->
Sub = {SubPid, Topic}, Sub = {Topic, SubPid},
case ets:lookup(?SUBOPTION, Sub) of case ets:lookup(?SUBOPTION, Sub) of
[{_, OldOpts}] -> [{_, OldOpts}] ->
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)}); ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});

View File

@ -204,7 +204,7 @@ check_subs(Count) ->
check_subs([], []) -> check_subs([], []) ->
ok; ok;
check_subs([{{_, Topic}, #{subid := ?CLIENT_ID}} | Subs], List) -> check_subs([{{Topic, _}, #{subid := ?CLIENT_ID}} | Subs], List) ->
check_subs(Subs, lists:delete(Topic, List)); check_subs(Subs, lists:delete(Topic, List));
check_subs([_ | Subs], List) -> check_subs([_ | Subs], List) ->
check_subs(Subs, List). check_subs(Subs, List).

View File

@ -173,7 +173,7 @@ subscriptions(get, #{query_string := QString}) ->
{200, Result} {200, Result}
end. end.
format(WhichNode, {{_Subscriber, Topic}, Options}) -> format(WhichNode, {{Topic, _Subscriber}, Options}) ->
maps:merge( maps:merge(
#{ #{
topic => get_topic(Topic, Options), topic => get_topic(Topic, Options),
@ -205,14 +205,14 @@ gen_match_spec([], MtchHead) ->
gen_match_spec([{Key, '=:=', Value} | More], MtchHead) -> gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
gen_match_spec(More, update_ms(Key, Value, MtchHead)). gen_match_spec(More, update_ms(Key, Value, MtchHead)).
update_ms(clientid, X, {{Pid, Topic}, Opts}) -> update_ms(clientid, X, {{Topic, Pid}, Opts}) ->
{{Pid, Topic}, Opts#{subid => X}}; {{Topic, Pid}, Opts#{subid => X}};
update_ms(topic, X, {{Pid, _Topic}, Opts}) -> update_ms(topic, X, {{_Topic, Pid}, Opts}) ->
{{Pid, X}, Opts}; {{X, Pid}, Opts};
update_ms(share_group, X, {{Pid, Topic}, Opts}) -> update_ms(share_group, X, {{Topic, Pid}, Opts}) ->
{{Pid, Topic}, Opts#{share => X}}; {{Topic, Pid}, Opts#{share => X}};
update_ms(qos, X, {{Pid, Topic}, Opts}) -> update_ms(qos, X, {{Topic, Pid}, Opts}) ->
{{Pid, Topic}, Opts#{qos => X}}. {{Topic, Pid}, Opts#{qos => X}}.
fuzzy_filter_fun([]) -> fuzzy_filter_fun([]) ->
undefined; undefined;
@ -221,5 +221,5 @@ fuzzy_filter_fun(Fuzzy) ->
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> run_fuzzy_filter(E = {{Topic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).

View File

@ -213,7 +213,7 @@ subscriptions(["show", ClientId]) ->
[] -> [] ->
emqx_ctl:print("Not Found.~n"); emqx_ctl:print("Not Found.~n");
[{_, Pid}] -> [{_, Pid}] ->
case ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) of case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of
[] -> emqx_ctl:print("Not Found.~n"); [] -> emqx_ctl:print("Not Found.~n");
Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption]
end end
@ -829,7 +829,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) ->
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
print({emqx_topic, #route{topic = Topic, dest = Node}}) -> print({emqx_topic, #route{topic = Topic, dest = Node}}) ->
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
print({emqx_suboption, {{Pid, Topic}, Options}}) when is_pid(Pid) -> print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
SubId = maps:get(subid, Options), SubId = maps:get(subid, Options),
QoS = maps:get(qos, Options, 0), QoS = maps:get(qos, Options, 0),
NL = maps:get(nl, Options, 0), NL = maps:get(nl, Options, 0),