diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index a1e229cfb..a9978e206 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -17,4 +17,4 @@ -define(APP, emqx_retainer). -define(TAB, ?APP). -record(retained, {topic, msg, expiry_time}). - +-define(RETAINER_SHARD, emqx_retainer_shard). diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 9e6f60013..94c561b39 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -51,6 +51,8 @@ -record(state, {stats_fun, stats_timer, expiry_timer}). +-rlog_shard({?RETAINER_SHARD, ?TAB}). + %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- @@ -84,7 +86,7 @@ dispatch(Pid, Topic) -> on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, payload = <<>>}, _Env) -> - mnesia:dirty_delete(?TAB, topic2tokens(Topic)), + ekka_mnesia:dirty_delete(?TAB, topic2tokens(Topic)), {ok, Msg}; on_message_publish(Msg = #message{flags = #{retain := true}}, Env) -> @@ -115,7 +117,7 @@ clean(Topic) when is_binary(Topic) -> [_M] -> mnesia:delete({?TAB, Tokens}), 1 end end, - {atomic, N} = mnesia:transaction(Fun), N + {atomic, N} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), N end. %%-------------------------------------------------------------------- @@ -139,6 +141,7 @@ init([Env]) -> {attributes, record_info(fields, retained)}, {storage_properties, StoreProps}]), ok = ekka_mnesia:copy_table(?TAB, Copies), + ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), case mnesia:table_info(?TAB, storage_type) of Copies -> ok; _Other -> @@ -201,11 +204,11 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> case {is_table_full(Env), is_too_big(size(Payload), Env)} of {false, false} -> ok = emqx_metrics:inc('messages.retained'), - mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = get_expiry_time(Msg, Env)}); + ekka_mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = get_expiry_time(Msg, Env)}); {true, false} -> - {atomic, _} = mnesia:transaction( + {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, fun() -> case mnesia:read(?TAB, Topic) of [_] -> @@ -256,7 +259,7 @@ expire_messages() -> NowMs = erlang:system_time(millisecond), MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'}, Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}], - {atomic, _} = mnesia:transaction( + {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, fun() -> Keys = mnesia:select(?TAB, Ms, write), lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys) @@ -293,7 +296,7 @@ match_delete_messages(Filter) -> MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, Ms = [{MsHd, [], ['$_']}], Rs = mnesia:dirty_select(?TAB, Ms), - lists:foreach(fun(R) -> mnesia:dirty_delete_object(?TAB, R) end, Rs), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs), length(Rs). %% @private diff --git a/apps/emqx_retainer/src/emqx_retainer_cli.erl b/apps/emqx_retainer/src/emqx_retainer_cli.erl index fe8fa9578..1e965946f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_cli.erl @@ -38,7 +38,7 @@ cmd(["topics"]) -> cmd(["clean"]) -> Size = mnesia:table_info(?TAB, size), - case mnesia:clear_table(?TAB) of + case ekka_mnesia:clear_table(?TAB) of {atomic, ok} -> emqx_ctl:print("Cleaned ~p retained messages~n", [Size]); {aborted, R} -> emqx_ctl:print("Aborted ~p~n", [R]) end; @@ -55,4 +55,3 @@ cmd(_) -> unload() -> emqx_ctl:unregister_command(retainer). -