diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 62ce1af8b..a3a7420e3 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -90,6 +90,10 @@ -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). +-rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}). +-rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}). + + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -182,7 +186,7 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, - mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), + ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, Actions), {reply, ok, State} end; @@ -202,9 +206,14 @@ handle_call(delete_all_deactivated_alarms, _From, State) -> {reply, ok, State}; handle_call({get_alarms, all}, _From, State) -> - Alarms = [normalize(Alarm) || - Alarm <- ets:tab2list(?ACTIVATED_ALARM) - ++ ets:tab2list(?DEACTIVATED_ALARM)], + {atomic, Alarms} = + ekka_mnesia:ro_transaction( + ?COMMON_SHARD, + fun() -> + [normalize(Alarm) || + Alarm <- ets:tab2list(?ACTIVATED_ALARM) + ++ ets:tab2list(?DEACTIVATED_ALARM)] + end), {reply, Alarms, State}; handle_call({get_alarms, activated}, _From, State) -> @@ -252,7 +261,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, @@ -261,8 +270,8 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), - mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), - mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, Actions). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> @@ -279,7 +288,7 @@ deactivate_all_alarms() -> details = Details, message = Message, activate_at = ActivateAt}) -> - mnesia:dirty_write(?DEACTIVATED_ALARM, + ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, @@ -291,7 +300,7 @@ deactivate_all_alarms() -> %% Delete all records from the given table, ignore result. clear_table(TableName) -> - case mnesia:clear_table(TableName) of + case ekka_mnesia:clear_table(TableName) of {aborted, Reason} -> ?LOG(warning, "Faile to clear table ~p reason: ~p", [TableName, Reason]); @@ -311,7 +320,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), + ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 4d2e656b3..903f61c70 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -77,7 +77,7 @@ mnesia(copy) -> -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link(PredefTopics) -> - ekka_mnesia:wait_for_shards([?SN_SHARD], infinity), + ekka_rlog:wait_for_shards([?SN_SHARD], infinity), gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). -spec(stop() -> ok). @@ -172,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From, handle_call({unregister, ClientId}, _From, State) -> Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(R) end, Registry), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry), {reply, ok, State}; handle_call(Req, _From, State) ->