refactor: uniform shared_sub table macros

This commit is contained in:
JimMoen 2024-03-22 18:03:13 +08:00
parent 9b085b5797
commit 42faffc320
No known key found for this signature in database
2 changed files with 60 additions and 34 deletions

View File

@ -0,0 +1,28 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_SHARED_SUB_HRL).
-define(EMQX_SHARED_SUB_HRL, true).
%% Mnesia table for shared sub message routing
-define(SHARED_SUBSCRIPTION, emqx_shared_subscription).
%% ETS tables for Shared PubSub
-define(SHARED_SUBSCRIBER, emqx_shared_subscriber).
-define(ALIVE_SHARED_SUBSCRIBERS, emqx_alive_shared_subscribers).
-define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter).
-endif.

View File

@ -21,6 +21,7 @@
-include("emqx_schema.hrl"). -include("emqx_schema.hrl").
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("emqx_shared_sub.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
@ -84,10 +85,7 @@
| hash_topic. | hash_topic.
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TAB, emqx_shared_subscription).
-define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter).
-define(SHARED_SUBS, emqx_shared_subscriber).
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
-define(ACK, shared_sub_ack). -define(ACK, shared_sub_ack).
@ -99,21 +97,21 @@
-record(state, {pmon}). -record(state, {pmon}).
-record(emqx_shared_subscription, {group, topic, subpid}). -record(?SHARED_SUBSCRIPTION, {group, topic, subpid}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_tables() -> create_tables() ->
ok = mria:create_table(?TAB, [ ok = mria:create_table(?SHARED_SUBSCRIPTION, [
{type, bag}, {type, bag},
{rlog_shard, ?SHARED_SUB_SHARD}, {rlog_shard, ?SHARED_SUB_SHARD},
{storage, ram_copies}, {storage, ram_copies},
{record_name, emqx_shared_subscription}, {record_name, ?SHARED_SUBSCRIPTION},
{attributes, record_info(fields, emqx_shared_subscription)} {attributes, record_info(fields, ?SHARED_SUBSCRIPTION)}
]), ]),
[?TAB]. [?SHARED_SUBSCRIPTION].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -132,7 +130,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
record(Group, Topic, SubPid) -> record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #?SHARED_SUBSCRIPTION{group = Group, topic = Topic, subpid = SubPid}.
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
emqx_types:deliver_result(). emqx_types:deliver_result().
@ -394,18 +392,18 @@ subscribers(Group, Topic, FailedSubs) ->
%% Select ETS table to get all subscriber pids. %% Select ETS table to get all subscriber pids.
subscribers(Group, Topic) -> subscribers(Group, Topic) ->
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). ets:select(?SHARED_SUBSCRIPTION, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ok = mria:wait_for_tables([?TAB]), ok = mria:wait_for_tables([?SHARED_SUBSCRIPTION]),
{ok, _} = mnesia:subscribe({table, ?TAB, simple}), {ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}),
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]), ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]),
ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]),
ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [ ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [
public, set, {write_concurrency, true} public, set, {write_concurrency, true}
]), ]),
@ -413,26 +411,26 @@ init([]) ->
init_monitors() -> init_monitors() ->
mnesia:foldl( mnesia:foldl(
fun(#emqx_shared_subscription{subpid = SubPid}, Mon) -> fun(#?SHARED_SUBSCRIPTION{subpid = SubPid}, Mon) ->
emqx_pmon:monitor(SubPid, Mon) emqx_pmon:monitor(SubPid, Mon)
end, end,
emqx_pmon:new(), emqx_pmon:new(),
?TAB ?SHARED_SUBSCRIPTION
). ).
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
mria:dirty_write(?TAB, record(Group, Topic, SubPid)), mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of
true -> ok; true -> ok;
false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
end, end,
ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_alive_tab(SubPid),
ok = maybe_insert_round_robin_count({Group, Topic}), ok = maybe_insert_round_robin_count({Group, Topic}),
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), mria:dirty_delete_object(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
delete_route_if_needed({Group, Topic}), delete_route_if_needed({Group, Topic}),
maybe_delete_round_robin_count({Group, Topic}), maybe_delete_round_robin_count({Group, Topic}),
{reply, ok, update_stats(State)}; {reply, ok, update_stats(State)};
@ -445,7 +443,7 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info( handle_info(
{mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}}, {mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}},
State = #state{pmon = PMon} State = #state{pmon = PMon}
) -> ) ->
ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_alive_tab(SubPid),
@ -455,7 +453,7 @@ handle_info(
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually %% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
%% be disconnected. %% be disconnected.
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
% #emqx_shared_subscription{subpid = SubPid} = OldRecord, % #?SHARED_SUBSCRIPTION{subpid = SubPid} = OldRecord,
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
handle_info({mnesia_table_event, _Event}, State) -> handle_info({mnesia_table_event, _Event}, State) ->
@ -468,7 +466,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
mnesia:unsubscribe({table, ?TAB, simple}). mnesia:unsubscribe({table, ?SHARED_SUBSCRIPTION, simple}).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -501,7 +499,7 @@ maybe_delete_round_robin_count({Group, _Topic} = GroupTopic) ->
ok. ok.
if_no_more_subscribers(GroupTopic, Fn) -> if_no_more_subscribers(GroupTopic, Fn) ->
case ets:member(?SHARED_SUBS, GroupTopic) of case ets:member(?SHARED_SUBSCRIBER, GroupTopic) of
true -> ok; true -> ok;
false -> Fn() false -> Fn()
end, end,
@ -510,26 +508,26 @@ if_no_more_subscribers(GroupTopic, Fn) ->
%% keep track of alive remote pids %% keep track of alive remote pids
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
maybe_insert_alive_tab(Pid) when is_pid(Pid) -> maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
ets:insert(?ALIVE_SUBS, {Pid}), ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}),
ok. ok.
cleanup_down(SubPid) -> cleanup_down(SubPid) ->
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid),
lists:foreach( lists:foreach(
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) ->
ok = mria:dirty_delete_object(?TAB, Record), ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
maybe_delete_round_robin_count({Group, Topic}), maybe_delete_round_robin_count({Group, Topic}),
delete_route_if_needed({Group, Topic}) delete_route_if_needed({Group, Topic})
end, end,
mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid}) mnesia:dirty_match_object(#?SHARED_SUBSCRIPTION{_ = '_', subpid = SubPid})
). ).
update_stats(State) -> update_stats(State) ->
emqx_stats:setstat( emqx_stats:setstat(
'subscriptions.shared.count', 'subscriptions.shared.count',
'subscriptions.shared.max', 'subscriptions.shared.max',
ets:info(?TAB, size) ets:info(?SHARED_SUBSCRIPTION, size)
), ),
State. State.
@ -543,7 +541,7 @@ is_active_sub(Pid, FailedSubs, All) ->
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
erlang:is_process_alive(Pid); erlang:is_process_alive(Pid);
is_alive_sub(Pid) -> is_alive_sub(Pid) ->
[] =/= ets:lookup(?ALIVE_SUBS, Pid). [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid).
delete_route_if_needed({Group, Topic} = GroupTopic) -> delete_route_if_needed({Group, Topic} = GroupTopic) ->
if_no_more_subscribers(GroupTopic, fun() -> if_no_more_subscribers(GroupTopic, fun() ->