diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index b38614562..9fe69fd30 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -24,6 +24,7 @@ -define(Otherwise, true). -define(COMMON_SHARD, emqx_common_shard). +-define(SHARED_SUB_SHARD, emqx_shared_sub_shard). %%-------------------------------------------------------------------- %% Banner diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 61a5a5633..dfdc9c0f8 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -28,7 +28,7 @@ -define(APP, emqx). --define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD]). +-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]). -include("emqx_release.hrl"). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 97aa778f3..c002653ba 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -77,6 +77,8 @@ -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-rlog_shard({?SHARED_SUB_SHARD, ?TAB}). + -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -297,7 +299,7 @@ subscribers(Group, Topic) -> init([]) -> {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = mnesia:transaction(fun init_monitors/0), + {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. @@ -309,7 +311,7 @@ init_monitors() -> end, emqx_pmon:new(), ?TAB). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) @@ -319,7 +321,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), {reply, ok, State}; @@ -373,7 +375,7 @@ cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = mnesia:dirty_delete_object(?TAB, Record), + ok = ekka_mnesia:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).