Merge pull request #7264 from lafirest/fix/delayed_api

fix(delayed): fix that the query is not a cluster query
This commit is contained in:
JianBo He 2022-03-11 16:07:53 +08:00 committed by GitHub
commit 97e1d249ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 6 deletions

View File

@ -48,11 +48,14 @@
, get_delayed_message/1
, delete_delayed_message/1
, post_config_update/5
, cluster_list/1
, cluster_query/4
]).
-export([format_delayed/1]).
-record(delayed_message, {key, delayed, msg}).
-type delayed_message() :: #delayed_message{}.
%% sync ms with record change
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
@ -114,7 +117,7 @@ start_link() ->
Opts = emqx_conf:get([delayed], #{}),
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
-spec(store(#delayed_message{}) -> ok | {error, atom()}).
-spec(store(delayed_message()) -> ok | {error, atom()}).
store(DelayedMsg) ->
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
@ -130,6 +133,13 @@ set_max_delayed_messages(Max) ->
list(Params) ->
emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
cluster_list(Params) ->
emqx_mgmt_api:cluster_query(Params, ?TAB, [], {?MODULE, cluster_query}).
cluster_query(Table, _QsSpec, Continuation, Limit) ->
Ms = [{'$1', [], ['$1']}],
emqx_mgmt_api:select_table_with_count(Table, Ms, Continuation, Limit, fun format_delayed/1).
format_delayed(Delayed) ->
format_delayed(Delayed, false).
@ -264,14 +274,15 @@ handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) ->
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
StatsFun(delayed_count()),
{noreply, State, hibernate};
{noreply, State#{stats_timer := StatsTimer}, hibernate};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->
terminate(_Reason, #{stats_timer := TRef}) ->
emqx_conf:remove_handler([delayed]),
emqx_misc:cancel_timer(TRef).
@ -285,7 +296,7 @@ code_change(_Vsn, State, _Extra) ->
%% Ensure the stats
ensure_stats_event(State) ->
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
%% Ensure publish timer
@ -322,7 +333,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
[#delayed_message{msg = Msg}] ->
emqx_pool:async_submit(fun emqx:publish/1, [Msg])
end,
do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]).
do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]).
-spec(delayed_count() -> non_neg_integer()).
delayed_count() -> mnesia:table_info(?TAB, size).

View File

@ -160,7 +160,7 @@ status(put, #{body := Body}) ->
update_config(Body).
delayed_messages(get, #{query_string := Qs}) ->
{200, emqx_delayed:list(Qs)}.
{200, emqx_delayed:cluster_list(Qs)}.
delayed_message(get, #{bindings := #{msgid := Id}}) ->
case emqx_delayed:get_delayed_message(Id) of