diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 36569d52e..73a21d3e6 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -187,7 +187,7 @@ delete_delayed_message(Id0) -> mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> - {ok, _} = emqx:update_config([delayed], Config). + emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). %%-------------------------------------------------------------------- %% gen_server callback diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 9199d7b2c..e582d9189 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -25,12 +25,14 @@ -define(MAX_PAYLOAD_LENGTH, 2048). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). --export([status/2 - , delayed_messages/2 - , delayed_message/2 -]). +-export([ status/2 + , delayed_messages/2 + , delayed_message/2 + ]). --export([paths/0, fields/1, schema/1]). +-export([ paths/0 + , fields/1 + , schema/1]). %% for rpc -export([update_config_/1]). @@ -40,6 +42,7 @@ -define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED'). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). @@ -49,7 +52,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE). -paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. +paths() -> + [ "/mqtt/delayed" + , "/mqtt/delayed/messages" + , "/mqtt/delayed/messages/:msgid" + ]. schema("/mqtt/delayed") -> #{ @@ -189,8 +196,7 @@ get_status() -> update_config(Config) -> case generate_config(Config) of {ok, Config} -> - update_config_(Config), - {200, get_status()}; + update_config_(Config); {error, {Code, Message}} -> {400, #{code => Code, message => Message}} end. @@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) -> {ok, Config}. update_config_(Config) -> - lists:foreach(fun(Node) -> - update_config_(Node, Config) - end, mria_mnesia:running_nodes()). - -update_config_(Node, Config) when Node =:= node() -> - _ = emqx_delayed:update_config(Config), - case maps:get(<<"enable">>, Config, undefined) of - undefined -> - ignore; - true -> - emqx_delayed:enable(); - false -> - emqx_delayed:disable() - end, - case maps:get(<<"max_delayed_messages">>, Config, undefined) of - undefined -> - ignore; - Max -> - ok = emqx_delayed:set_max_delayed_messages(Max) - end; - -update_config_(Node, Config) -> - rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). + case emqx_delayed:update_config(Config) of + {ok, #{config := NewDelayed}} -> + case maps:get(<<"enable">>, Config, undefined) of + undefined -> + ignore; + true -> + emqx_delayed:enable(); + false -> + emqx_delayed:disable() + end, + case maps:get(<<"max_delayed_messages">>, Config, undefined) of + undefined -> + ignore; + Max -> + ok = emqx_delayed:set_max_delayed_messages(Max) + end, + {200, NewDelayed}; + {error, Reason} -> + Message = list_to_binary( + io_lib:format("Update delayed message config failed ~p", [Reason])), + {500, ?INTERNAL_ERROR, Message} + end. generate_http_code_map(id_schema_error, Id) -> #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => @@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(not_found, Id) -> #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. - -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Result -> Result - end.