chore(mod_delayed): Add RLOG shard

This commit is contained in:
k32 2021-06-28 21:02:26 +02:00
parent 2bea95ff6e
commit e8e956b074
3 changed files with 6 additions and 3 deletions

View File

@ -25,6 +25,7 @@
-define(COMMON_SHARD, emqx_common_shard). -define(COMMON_SHARD, emqx_common_shard).
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Banner %% Banner

View File

@ -32,6 +32,7 @@
, ?COMMON_SHARD , ?COMMON_SHARD
, ?SHARED_SUB_SHARD , ?SHARED_SUB_SHARD
, ?RULE_ENGINE_SHARD , ?RULE_ENGINE_SHARD
, ?MOD_DELAYED_SHARD
]). ]).
-include("emqx_release.hrl"). -include("emqx_release.hrl").

View File

@ -58,6 +58,8 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(MAX_INTERVAL, 4294967). -define(MAX_INTERVAL, 4294967).
-rlog_shard({?MOD_DELAYED_SHARD, ?TAB}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -137,7 +139,7 @@ init([]) ->
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
ok = mnesia:dirty_write(?TAB, DelayedMsg), ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'), emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)}; {reply, ok, ensure_publish_timer(Key, State)};
@ -152,7 +154,7 @@ handle_cast(Msg, State) ->
%% Do Publish... %% Do Publish...
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) -> handle_info(stats, State = #{stats_fun := StatsFun}) ->
@ -222,4 +224,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
-spec(delayed_count() -> non_neg_integer()). -spec(delayed_count() -> non_neg_integer()).
delayed_count() -> mnesia:table_info(?TAB, size). delayed_count() -> mnesia:table_info(?TAB, size).