diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 8be7e1419..db6226c1f 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -48,11 +48,14 @@ , get_delayed_message/1 , delete_delayed_message/1 , post_config_update/5 + , cluster_list/1 + , cluster_query/4 ]). -export([format_delayed/1]). -record(delayed_message, {key, delayed, msg}). +-type delayed_message() :: #delayed_message{}. %% sync ms with record change -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]). @@ -114,7 +117,7 @@ start_link() -> Opts = emqx_conf:get([delayed], #{}), gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). --spec(store(#delayed_message{}) -> ok | {error, atom()}). +-spec(store(delayed_message()) -> ok | {error, atom()}). store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). @@ -130,6 +133,13 @@ set_max_delayed_messages(Max) -> list(Params) -> emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN). +cluster_list(Params) -> + emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}). + +cluster_query(Table, _QsSpec, Continuation, Limit) -> + Ms = [{'$1', [], ['$1']}], + emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1). + format_delayed(Delayed) -> format_delayed(Delayed, false). @@ -264,14 +274,15 @@ handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; handle_info(stats, State = #{stats_fun := StatsFun}) -> + StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), StatsFun(delayed_count()), - {noreply, State, hibernate}; + {noreply, State#{stats_timer := StatsTimer}, hibernate}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. -terminate(_Reason, #{timer := TRef}) -> +terminate(_Reason, #{stats_timer := TRef}) -> emqx_conf:remove_handler([delayed]), emqx_misc:cancel_timer(TRef). @@ -285,7 +296,7 @@ code_change(_Vsn, State, _Extra) -> %% Ensure the stats ensure_stats_event(State) -> StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), - {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), + StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), State#{stats_fun => StatsFun, stats_timer => StatsTimer}. %% Ensure publish timer @@ -322,7 +333,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> [#delayed_message{msg = Msg}] -> emqx_pool:async_submit(fun emqx:publish/1, [Msg]) end, - do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]). + do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). -spec(delayed_count() -> non_neg_integer()). delayed_count() -> mnesia:table_info(?TAB, size). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index cee9298ea..b6756c695 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -160,7 +160,7 @@ status(put, #{body := Body}) -> update_config(Body). delayed_messages(get, #{query_string := Qs}) -> - {200, emqx_delayed:list(Qs)}. + {200, emqx_delayed:cluster_list(Qs)}. delayed_message(get, #{bindings := #{msgid := Id}}) -> case emqx_delayed:get_delayed_message(Id) of