Merge pull request #5126 from k32/dev/delayed-mod-shard
chore(mod_delayed): Add RLOG shard
This commit is contained in:
commit
dae4c771d0
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
-define(COMMON_SHARD, emqx_common_shard).
|
-define(COMMON_SHARD, emqx_common_shard).
|
||||||
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
||||||
|
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Banner
|
%% Banner
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
, ?COMMON_SHARD
|
, ?COMMON_SHARD
|
||||||
, ?SHARED_SUB_SHARD
|
, ?SHARED_SUB_SHARD
|
||||||
, ?RULE_ENGINE_SHARD
|
, ?RULE_ENGINE_SHARD
|
||||||
|
, ?MOD_DELAYED_SHARD
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("emqx_release.hrl").
|
-include("emqx_release.hrl").
|
||||||
|
|
|
@ -58,6 +58,8 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(MAX_INTERVAL, 4294967).
|
-define(MAX_INTERVAL, 4294967).
|
||||||
|
|
||||||
|
-rlog_shard({?MOD_DELAYED_SHARD, ?TAB}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -137,7 +139,7 @@ init([]) ->
|
||||||
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
|
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
|
||||||
|
|
||||||
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
||||||
ok = mnesia:dirty_write(?TAB, DelayedMsg),
|
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
|
||||||
emqx_metrics:inc('messages.delayed'),
|
emqx_metrics:inc('messages.delayed'),
|
||||||
{reply, ok, ensure_publish_timer(Key, State)};
|
{reply, ok, ensure_publish_timer(Key, State)};
|
||||||
|
|
||||||
|
@ -152,7 +154,7 @@ handle_cast(Msg, State) ->
|
||||||
%% Do Publish...
|
%% Do Publish...
|
||||||
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
|
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
|
||||||
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
|
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
|
||||||
lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
|
lists:foreach(fun(Key) -> ekka_mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
|
||||||
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
|
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
|
||||||
|
|
||||||
handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
||||||
|
@ -222,4 +224,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
|
||||||
|
|
||||||
-spec(delayed_count() -> non_neg_integer()).
|
-spec(delayed_count() -> non_neg_integer()).
|
||||||
delayed_count() -> mnesia:table_info(?TAB, size).
|
delayed_count() -> mnesia:table_info(?TAB, size).
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,15 @@
|
||||||
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
|
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
|
||||||
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
||||||
|
|
||||||
|
%% Check for the mnesia calls forbidden by Ekka:
|
||||||
|
{xref_queries,
|
||||||
|
[ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", []}
|
||||||
|
, {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []}
|
||||||
|
, {"E || \"mnesia\":\"transaction\"/\".*\" : Fun", []}
|
||||||
|
, {"E || \"mnesia\":\"async_dirty\"/\".*\" : Fun", []}
|
||||||
|
, {"E || \"mnesia\":\"clear_table\"/\".*\" : Fun", []}
|
||||||
|
]}.
|
||||||
|
|
||||||
{dialyzer, [
|
{dialyzer, [
|
||||||
{warnings, [unmatched_returns, error_handling, race_conditions]},
|
{warnings, [unmatched_returns, error_handling, race_conditions]},
|
||||||
{plt_location, "."},
|
{plt_location, "."},
|
||||||
|
|
Loading…
Reference in New Issue