diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index ba72a47b5..9fe69fd30 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,6 +23,9 @@ -define(Otherwise, true). +-define(COMMON_SHARD, emqx_common_shard). +-define(SHARED_SUB_SHARD, emqx_shared_sub_shard). + %%-------------------------------------------------------------------- %% Banner %%-------------------------------------------------------------------- @@ -134,4 +137,3 @@ }). -endif. - diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 62ce1af8b..a3a7420e3 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -90,6 +90,10 @@ -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). +-rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}). +-rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}). + + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -182,7 +186,7 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, - mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), + ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, Actions), {reply, ok, State} end; @@ -202,9 +206,14 @@ handle_call(delete_all_deactivated_alarms, _From, State) -> {reply, ok, State}; handle_call({get_alarms, all}, _From, State) -> - Alarms = [normalize(Alarm) || - Alarm <- ets:tab2list(?ACTIVATED_ALARM) - ++ ets:tab2list(?DEACTIVATED_ALARM)], + {atomic, Alarms} = + ekka_mnesia:ro_transaction( + ?COMMON_SHARD, + fun() -> + [normalize(Alarm) || + Alarm <- ets:tab2list(?ACTIVATED_ALARM) + ++ ets:tab2list(?DEACTIVATED_ALARM)] + end), {reply, Alarms, State}; handle_call({get_alarms, activated}, _From, State) -> @@ -252,7 +261,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, @@ -261,8 +270,8 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), - mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), - mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, Actions). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> @@ -279,7 +288,7 @@ deactivate_all_alarms() -> details = Details, message = Message, activate_at = ActivateAt}) -> - mnesia:dirty_write(?DEACTIVATED_ALARM, + ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, @@ -291,7 +300,7 @@ deactivate_all_alarms() -> %% Delete all records from the given table, ignore result. clear_table(TableName) -> - case mnesia:clear_table(TableName) of + case ekka_mnesia:clear_table(TableName) of {aborted, Reason} -> ?LOG(warning, "Faile to clear table ~p reason: ~p", [TableName, Reason]); @@ -311,7 +320,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 234f42645..dfdc9c0f8 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -28,7 +28,7 @@ -define(APP, emqx). --define(EMQX_SHARDS, [route_shard]). +-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]). -include("emqx_release.hrl"). diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 762a2b61b..16804d329 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -51,6 +51,8 @@ -define(BANNED_TAB, ?MODULE). +-rlog_shard({?COMMON_SHARD, ?BANNED_TAB}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -96,19 +98,19 @@ create(#{who := Who, reason := Reason, at := At, until := Until}) -> - mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, - by = By, - reason = Reason, - at = At, - until = Until}); + ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who, + by = By, + reason = Reason, + at = At, + until = Until}); create(Banned) when is_record(Banned, banned) -> - mnesia:dirty_write(?BANNED_TAB, Banned). + ekka_mnesia:dirty_write(?BANNED_TAB, Banned). -spec(delete({clientid, emqx_types:clientid()} | {username, emqx_types:username()} | {peerhost, emqx_types:peerhost()}) -> ok). delete(Who) -> - mnesia:dirty_delete(?BANNED_TAB, Who). + ekka_mnesia:dirty_delete(?BANNED_TAB, Who). info(InfoKey) -> mnesia:table_info(?BANNED_TAB, InfoKey). @@ -129,7 +131,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]), + ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> @@ -160,4 +162,3 @@ expire_banned_items(Now) -> mnesia:delete_object(?BANNED_TAB, B, sticky_write); (_, _Acc) -> ok end, ok, ?BANNED_TAB). - diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 97aa778f3..c002653ba 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -77,6 +77,8 @@ -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-rlog_shard({?SHARED_SUB_SHARD, ?TAB}). + -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -297,7 +299,7 @@ subscribers(Group, Topic) -> init([]) -> {ok, _} = mnesia:subscribe({table, ?TAB, simple}), - {atomic, PMon} = mnesia:transaction(fun init_monitors/0), + {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. @@ -309,7 +311,7 @@ init_monitors() -> end, emqx_pmon:new(), ?TAB). handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> - mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) @@ -319,7 +321,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> - mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), {reply, ok, State}; @@ -373,7 +375,7 @@ cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> - ok = mnesia:dirty_delete_object(?TAB, Record), + ok = ekka_mnesia:dirty_delete_object(?TAB, Record), true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}) end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 4a3b22585..903f61c70 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -44,6 +44,8 @@ , code_change/3 ]). +-define(SN_SHARD, emqx_sn_shard). + -define(TAB, ?MODULE). -record(state, {max_predef_topic_id = 0}). @@ -56,6 +58,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +-rlog_shard({?SN_SHARD, ?TAB}). %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). @@ -74,6 +77,7 @@ mnesia(copy) -> -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link(PredefTopics) -> + ekka_rlog:wait_for_shards([?SN_SHARD], infinity), gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). -spec(stop() -> ok). @@ -129,10 +133,10 @@ init([PredefTopics]) -> %% {ClientId, TopicName} -> TopicId MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> - mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, - value = TopicName}), - mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, - value = TopicId}), + ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, + value = TopicName}), + ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, + value = TopicId}), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), {ok, #state{max_predef_topic_id = MaxPredefId}}. @@ -157,7 +161,7 @@ handle_call({register, ClientId, TopicName}, _From, mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, value = TopicName}) end, - case mnesia:transaction(Fun) of + case ekka_mnesia:transaction(?SN_SHARD, Fun) of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -168,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From, handle_call({unregister, ClientId}, _From, State) -> Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry), {reply, ok, State}; handle_call(Req, _From, State) -> diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl index 8d320d8ed..58a458ecc 100644 --- a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl @@ -41,9 +41,10 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + application:set_env(ekka, strict_mode, true), ekka_mnesia:start(), emqx_sn_registry:mnesia(boot), - mnesia:clear_table(emqx_sn_registry), + ekka_mnesia:clear_table(emqx_sn_registry), PredefTopics = application:get_env(emqx_sn, predefined, []), {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), Config. @@ -118,4 +119,3 @@ register_a_lot(N, Max) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), register_a_lot(N+1, Max). - diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl index ea4017dc9..dd6e7aa4c 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry.erl @@ -90,6 +90,8 @@ -define(TELEMETRY, emqx_telemetry). +-rlog_shard({?COMMON_SHARD, ?TELEMETRY}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -146,9 +148,9 @@ init([Opts]) -> [] -> Enabled = proplists:get_value(enabled, Opts, true), UUID = generate_uuid(), - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = Enabled}), + ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = Enabled}), State#state{enabled = Enabled, uuid = UUID}; [#telemetry{uuid = UUID, enabled = Enabled} | _] -> State#state{enabled = Enabled, uuid = UUID} @@ -162,16 +164,16 @@ init([Opts]) -> end. handle_call(enable, _From, State = #state{uuid = UUID}) -> - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = true}), + ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = true}), _ = erlang:send(self(), first_report), {reply, ok, State#state{enabled = true}}; handle_call(disable, _From, State = #state{uuid = UUID}) -> - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = false}), + ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = false}), {reply, ok, State#state{enabled = false}}; handle_call(is_enabled, _From, State = #state{enabled = Enabled}) ->