chore(shared_sub): Add shared_sub shard
This commit is contained in:
parent
ce4800e6ae
commit
26b2216e25
|
@ -24,6 +24,7 @@
|
||||||
-define(Otherwise, true).
|
-define(Otherwise, true).
|
||||||
|
|
||||||
-define(COMMON_SHARD, emqx_common_shard).
|
-define(COMMON_SHARD, emqx_common_shard).
|
||||||
|
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Banner
|
%% Banner
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
|
|
||||||
-define(APP, emqx).
|
-define(APP, emqx).
|
||||||
|
|
||||||
-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD]).
|
-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]).
|
||||||
|
|
||||||
-include("emqx_release.hrl").
|
-include("emqx_release.hrl").
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,8 @@
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
|
|
||||||
|
-rlog_shard({?SHARED_SUB_SHARD, ?TAB}).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
-record(emqx_shared_subscription, {group, topic, subpid}).
|
-record(emqx_shared_subscription, {group, topic, subpid}).
|
||||||
|
@ -297,7 +299,7 @@ subscribers(Group, Topic) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, _} = mnesia:subscribe({table, ?TAB, simple}),
|
{ok, _} = mnesia:subscribe({table, ?TAB, simple}),
|
||||||
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
|
{atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
|
||||||
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
|
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
|
||||||
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
|
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
|
||||||
{ok, update_stats(#state{pmon = PMon})}.
|
{ok, update_stats(#state{pmon = PMon})}.
|
||||||
|
@ -309,7 +311,7 @@ init_monitors() ->
|
||||||
end, emqx_pmon:new(), ?TAB).
|
end, emqx_pmon:new(), ?TAB).
|
||||||
|
|
||||||
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
||||||
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
||||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
|
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
|
||||||
|
@ -319,7 +321,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
|
||||||
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
||||||
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
||||||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
delete_route_if_needed({Group, Topic}),
|
delete_route_if_needed({Group, Topic}),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
@ -373,7 +375,7 @@ cleanup_down(SubPid) ->
|
||||||
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
|
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
||||||
ok = mnesia:dirty_delete_object(?TAB, Record),
|
ok = ekka_mnesia:dirty_delete_object(?TAB, Record),
|
||||||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
delete_route_if_needed({Group, Topic})
|
delete_route_if_needed({Group, Topic})
|
||||||
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
||||||
|
|
Loading…
Reference in New Issue