fix(delayed): update config in cluster

This commit is contained in:
DDDHuang 2021-12-30 16:43:09 +08:00
parent f19ccdfcde
commit 23cf74d829
2 changed files with 37 additions and 38 deletions

View File

@ -187,7 +187,7 @@ delete_delayed_message(Id0) ->
mria:dirty_delete(?TAB, {Timestamp, Id}) mria:dirty_delete(?TAB, {Timestamp, Id})
end. end.
update_config(Config) -> update_config(Config) ->
{ok, _} = emqx:update_config([delayed], Config). emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callback %% gen_server callback

View File

@ -25,12 +25,14 @@
-define(MAX_PAYLOAD_LENGTH, 2048). -define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
-export([status/2 -export([ status/2
, delayed_messages/2 , delayed_messages/2
, delayed_message/2 , delayed_message/2
]). ]).
-export([paths/0, fields/1, schema/1]). -export([ paths/0
, fields/1
, schema/1]).
%% for rpc %% for rpc
-export([update_config_/1]). -export([update_config_/1]).
@ -40,6 +42,7 @@
-define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_ENABLED, 'ALREADY_ENABLED').
-define(ALREADY_DISABLED, 'ALREADY_DISABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED').
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
@ -49,7 +52,11 @@
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. paths() ->
[ "/mqtt/delayed"
, "/mqtt/delayed/messages"
, "/mqtt/delayed/messages/:msgid"
].
schema("/mqtt/delayed") -> schema("/mqtt/delayed") ->
#{ #{
@ -189,8 +196,7 @@ get_status() ->
update_config(Config) -> update_config(Config) ->
case generate_config(Config) of case generate_config(Config) of
{ok, Config} -> {ok, Config} ->
update_config_(Config), update_config_(Config);
{200, get_status()};
{error, {Code, Message}} -> {error, {Code, Message}} ->
{400, #{code => Code, message => Message}} {400, #{code => Code, message => Message}}
end. end.
@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) ->
{ok, Config}. {ok, Config}.
update_config_(Config) -> update_config_(Config) ->
lists:foreach(fun(Node) -> case emqx_delayed:update_config(Config) of
update_config_(Node, Config) {ok, #{config := NewDelayed}} ->
end, mria_mnesia:running_nodes()). case maps:get(<<"enable">>, Config, undefined) of
undefined ->
update_config_(Node, Config) when Node =:= node() -> ignore;
_ = emqx_delayed:update_config(Config), true ->
case maps:get(<<"enable">>, Config, undefined) of emqx_delayed:enable();
undefined -> false ->
ignore; emqx_delayed:disable()
true -> end,
emqx_delayed:enable(); case maps:get(<<"max_delayed_messages">>, Config, undefined) of
false -> undefined ->
emqx_delayed:disable() ignore;
end, Max ->
case maps:get(<<"max_delayed_messages">>, Config, undefined) of ok = emqx_delayed:set_max_delayed_messages(Max)
undefined -> end,
ignore; {200, NewDelayed};
Max -> {error, Reason} ->
ok = emqx_delayed:set_max_delayed_messages(Max) Message = list_to_binary(
end; io_lib:format("Update delayed message config failed ~p", [Reason])),
{500, ?INTERNAL_ERROR, Message}
update_config_(Node, Config) -> end.
rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(id_schema_error, Id) ->
#{code => ?MESSAGE_ID_SCHEMA_ERROR, message => #{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) ->
generate_http_code_map(not_found, Id) -> generate_http_code_map(not_found, Id) ->
#{code => ?MESSAGE_ID_NOT_FOUND, message => #{code => ?MESSAGE_ID_NOT_FOUND, message =>
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
rpc_call(Node, Module, Fun, Args) ->
case rpc:call(Node, Module, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Result -> Result
end.