diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 558e5005c..81b0d70a4 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -80,8 +80,15 @@ format(Rule = #{topic := Topic}) when is_map(Rule) -> }. update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> - {ok, _} = emqx:update_config([auto_subscribe, topics], Topics), - update_hook(); + case emqx_conf:update([auto_subscribe, topics], + Topics, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewTopics}} -> + ok = update_hook(), + {ok, NewTopics}; + {error, Reason} -> + {error, Reason} + end; update_(_Topics) -> {error, quota_exceeded}. diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl index d1207544a..cb5372d5d 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl @@ -22,6 +22,7 @@ -export([auto_subscribe/2]). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_REQUEST, 'BAD_REQUEST'). @@ -90,6 +91,9 @@ auto_subscribe(put, #{body := Params}) -> Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", [emqx_auto_subscribe:max_limit()])), {409, #{code => ?EXCEED_LIMIT, message => Message}}; - ok -> - {200, emqx_auto_subscribe:list()} + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, #{code => ?INTERNAL_ERROR, message => Message}}; + {ok, NewTopics} -> + {200, NewTopics} end. diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 0e5022533..92eb9a9ab 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -85,7 +85,7 @@ init_per_suite(Config) -> } ] }">>), - emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1), + emqx_common_test_helpers:start_apps([emqx_dashboard, emqx_conf, ?APP], fun set_special_configs/1), Config. set_special_configs(emqx_dashboard) -> @@ -113,15 +113,17 @@ topic_config(T) -> end_per_suite(_) -> application:unload(emqx_management), + application:unload(emqx_conf), application:unload(?APP), meck:unload(emqx_resource), meck:unload(emqx_schema), - emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]). + emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]). t_auto_subscribe(_) -> + emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]), {ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}), {ok, _} = emqtt:connect(Client), - timer:sleep(100), + timer:sleep(200), ?assertEqual(check_subs(length(?TOPICS)), ok), emqtt:disconnect(Client), ok. @@ -148,6 +150,7 @@ t_update(_) -> check_subs(Count) -> Subs = ets:tab2list(emqx_suboption), + ct:pal("---> ~p ~p ~n", [Subs, Count]), ?assert(length(Subs) >= Count), check_subs((Subs), ?ENSURE_TOPICS). diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 69e9ebb5b..fef0a1ae1 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -187,7 +187,7 @@ delete_delayed_message(Id0) -> mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> - {ok, _} = emqx:update_config([delayed], Config). + emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). %%-------------------------------------------------------------------- %% gen_server callback diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 9199d7b2c..5caad8aa1 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -25,12 +25,14 @@ -define(MAX_PAYLOAD_LENGTH, 2048). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). --export([status/2 - , delayed_messages/2 - , delayed_message/2 -]). +-export([ status/2 + , delayed_messages/2 + , delayed_message/2 + ]). --export([paths/0, fields/1, schema/1]). +-export([ paths/0 + , fields/1 + , schema/1]). %% for rpc -export([update_config_/1]). @@ -40,6 +42,7 @@ -define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED'). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). @@ -49,7 +52,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE). -paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. +paths() -> + [ "/mqtt/delayed" + , "/mqtt/delayed/messages" + , "/mqtt/delayed/messages/:msgid" + ]. schema("/mqtt/delayed") -> #{ @@ -189,8 +196,7 @@ get_status() -> update_config(Config) -> case generate_config(Config) of {ok, Config} -> - update_config_(Config), - {200, get_status()}; + update_config_(Config); {error, {Code, Message}} -> {400, #{code => Code, message => Message}} end. @@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) -> {ok, Config}. update_config_(Config) -> - lists:foreach(fun(Node) -> - update_config_(Node, Config) - end, mria_mnesia:running_nodes()). - -update_config_(Node, Config) when Node =:= node() -> - _ = emqx_delayed:update_config(Config), - case maps:get(<<"enable">>, Config, undefined) of - undefined -> - ignore; - true -> - emqx_delayed:enable(); - false -> - emqx_delayed:disable() - end, - case maps:get(<<"max_delayed_messages">>, Config, undefined) of - undefined -> - ignore; - Max -> - ok = emqx_delayed:set_max_delayed_messages(Max) - end; - -update_config_(Node, Config) -> - rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). + case emqx_delayed:update_config(Config) of + {ok, #{raw_config := NewDelayed}} -> + case maps:get(<<"enable">>, Config, undefined) of + undefined -> + ignore; + true -> + emqx_delayed:enable(); + false -> + emqx_delayed:disable() + end, + case maps:get(<<"max_delayed_messages">>, Config, undefined) of + undefined -> + ignore; + Max -> + ok = emqx_delayed:set_max_delayed_messages(Max) + end, + {200, NewDelayed}; + {error, Reason} -> + Message = list_to_binary( + io_lib:format("Update config failed ~p", [Reason])), + {500, ?INTERNAL_ERROR, Message} + end. generate_http_code_map(id_schema_error, Id) -> #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => @@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(not_found, Id) -> #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. - -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Result -> Result - end. diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index ccdb75ccb..3af57a38d 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -44,8 +44,15 @@ list() -> update(Params) -> disable(), - {ok, _} = emqx:update_config([event_message], Params), - enable(). + case emqx_conf:update([event_message], + Params, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewEventMessage}} -> + enable(), + {ok, NewEventMessage}; + {error, Reason} -> + {error, Reason} + end. enable() -> lists:foreach(fun({_Topic, false}) -> ok; diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl index 80e5825d1..e27311e15 100644 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -53,5 +53,10 @@ event_message(get, _Params) -> {200, emqx_event_message:list()}; event_message(put, #{body := Body}) -> - _ = emqx_event_message:update(Body), - {200, emqx_event_message:list()}. + case emqx_event_message:update(Body) of + {ok, NewConfig} -> + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, 'INTERNAL_ERROR', Message} + end.