Merge pull request #6582 from DDDHuang/config_bugfix
fix(config): update configs in cluster
This commit is contained in:
commit
54ea7a5871
|
@ -80,8 +80,15 @@ format(Rule = #{topic := Topic}) when is_map(Rule) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
||||||
{ok, _} = emqx:update_config([auto_subscribe, topics], Topics),
|
case emqx_conf:update([auto_subscribe, topics],
|
||||||
update_hook();
|
Topics,
|
||||||
|
#{rawconf_with_defaults => true, override_to => cluster}) of
|
||||||
|
{ok, #{raw_config := NewTopics}} ->
|
||||||
|
ok = update_hook(),
|
||||||
|
{ok, NewTopics};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end;
|
||||||
update_(_Topics) ->
|
update_(_Topics) ->
|
||||||
{error, quota_exceeded}.
|
{error, quota_exceeded}.
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
|
|
||||||
-export([auto_subscribe/2]).
|
-export([auto_subscribe/2]).
|
||||||
|
|
||||||
|
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
||||||
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
||||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||||
|
|
||||||
|
@ -90,6 +91,9 @@ auto_subscribe(put, #{body := Params}) ->
|
||||||
Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p",
|
Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p",
|
||||||
[emqx_auto_subscribe:max_limit()])),
|
[emqx_auto_subscribe:max_limit()])),
|
||||||
{409, #{code => ?EXCEED_LIMIT, message => Message}};
|
{409, #{code => ?EXCEED_LIMIT, message => Message}};
|
||||||
ok ->
|
{error, Reason} ->
|
||||||
{200, emqx_auto_subscribe:list()}
|
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||||
|
{500, #{code => ?INTERNAL_ERROR, message => Message}};
|
||||||
|
{ok, NewTopics} ->
|
||||||
|
{200, NewTopics}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -85,7 +85,7 @@ init_per_suite(Config) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}">>),
|
}">>),
|
||||||
emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1),
|
emqx_common_test_helpers:start_apps([emqx_dashboard, emqx_conf, ?APP], fun set_special_configs/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
set_special_configs(emqx_dashboard) ->
|
set_special_configs(emqx_dashboard) ->
|
||||||
|
@ -113,15 +113,17 @@ topic_config(T) ->
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
application:unload(emqx_management),
|
application:unload(emqx_management),
|
||||||
|
application:unload(emqx_conf),
|
||||||
application:unload(?APP),
|
application:unload(?APP),
|
||||||
meck:unload(emqx_resource),
|
meck:unload(emqx_resource),
|
||||||
meck:unload(emqx_schema),
|
meck:unload(emqx_schema),
|
||||||
emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]).
|
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]).
|
||||||
|
|
||||||
t_auto_subscribe(_) ->
|
t_auto_subscribe(_) ->
|
||||||
|
emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]),
|
||||||
{ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}),
|
{ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
timer:sleep(100),
|
timer:sleep(200),
|
||||||
?assertEqual(check_subs(length(?TOPICS)), ok),
|
?assertEqual(check_subs(length(?TOPICS)), ok),
|
||||||
emqtt:disconnect(Client),
|
emqtt:disconnect(Client),
|
||||||
ok.
|
ok.
|
||||||
|
@ -148,6 +150,7 @@ t_update(_) ->
|
||||||
|
|
||||||
check_subs(Count) ->
|
check_subs(Count) ->
|
||||||
Subs = ets:tab2list(emqx_suboption),
|
Subs = ets:tab2list(emqx_suboption),
|
||||||
|
ct:pal("---> ~p ~p ~n", [Subs, Count]),
|
||||||
?assert(length(Subs) >= Count),
|
?assert(length(Subs) >= Count),
|
||||||
check_subs((Subs), ?ENSURE_TOPICS).
|
check_subs((Subs), ?ENSURE_TOPICS).
|
||||||
|
|
||||||
|
|
|
@ -187,7 +187,7 @@ delete_delayed_message(Id0) ->
|
||||||
mria:dirty_delete(?TAB, {Timestamp, Id})
|
mria:dirty_delete(?TAB, {Timestamp, Id})
|
||||||
end.
|
end.
|
||||||
update_config(Config) ->
|
update_config(Config) ->
|
||||||
{ok, _} = emqx:update_config([delayed], Config).
|
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callback
|
%% gen_server callback
|
||||||
|
|
|
@ -30,7 +30,9 @@
|
||||||
, delayed_message/2
|
, delayed_message/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([paths/0, fields/1, schema/1]).
|
-export([ paths/0
|
||||||
|
, fields/1
|
||||||
|
, schema/1]).
|
||||||
|
|
||||||
%% for rpc
|
%% for rpc
|
||||||
-export([update_config_/1]).
|
-export([update_config_/1]).
|
||||||
|
@ -40,6 +42,7 @@
|
||||||
-define(ALREADY_ENABLED, 'ALREADY_ENABLED').
|
-define(ALREADY_ENABLED, 'ALREADY_ENABLED').
|
||||||
-define(ALREADY_DISABLED, 'ALREADY_DISABLED').
|
-define(ALREADY_DISABLED, 'ALREADY_DISABLED').
|
||||||
|
|
||||||
|
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
||||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||||
|
|
||||||
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
|
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
|
||||||
|
@ -49,7 +52,11 @@
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
emqx_dashboard_swagger:spec(?MODULE).
|
emqx_dashboard_swagger:spec(?MODULE).
|
||||||
|
|
||||||
paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"].
|
paths() ->
|
||||||
|
[ "/mqtt/delayed"
|
||||||
|
, "/mqtt/delayed/messages"
|
||||||
|
, "/mqtt/delayed/messages/:msgid"
|
||||||
|
].
|
||||||
|
|
||||||
schema("/mqtt/delayed") ->
|
schema("/mqtt/delayed") ->
|
||||||
#{
|
#{
|
||||||
|
@ -189,8 +196,7 @@ get_status() ->
|
||||||
update_config(Config) ->
|
update_config(Config) ->
|
||||||
case generate_config(Config) of
|
case generate_config(Config) of
|
||||||
{ok, Config} ->
|
{ok, Config} ->
|
||||||
update_config_(Config),
|
update_config_(Config);
|
||||||
{200, get_status()};
|
|
||||||
{error, {Code, Message}} ->
|
{error, {Code, Message}} ->
|
||||||
{400, #{code => Code, message => Message}}
|
{400, #{code => Code, message => Message}}
|
||||||
end.
|
end.
|
||||||
|
@ -215,12 +221,8 @@ generate_max_delayed_messages(Config) ->
|
||||||
{ok, Config}.
|
{ok, Config}.
|
||||||
|
|
||||||
update_config_(Config) ->
|
update_config_(Config) ->
|
||||||
lists:foreach(fun(Node) ->
|
case emqx_delayed:update_config(Config) of
|
||||||
update_config_(Node, Config)
|
{ok, #{raw_config := NewDelayed}} ->
|
||||||
end, mria_mnesia:running_nodes()).
|
|
||||||
|
|
||||||
update_config_(Node, Config) when Node =:= node() ->
|
|
||||||
_ = emqx_delayed:update_config(Config),
|
|
||||||
case maps:get(<<"enable">>, Config, undefined) of
|
case maps:get(<<"enable">>, Config, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ignore;
|
ignore;
|
||||||
|
@ -234,10 +236,13 @@ update_config_(Node, Config) when Node =:= node() ->
|
||||||
ignore;
|
ignore;
|
||||||
Max ->
|
Max ->
|
||||||
ok = emqx_delayed:set_max_delayed_messages(Max)
|
ok = emqx_delayed:set_max_delayed_messages(Max)
|
||||||
end;
|
end,
|
||||||
|
{200, NewDelayed};
|
||||||
update_config_(Node, Config) ->
|
{error, Reason} ->
|
||||||
rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
|
Message = list_to_binary(
|
||||||
|
io_lib:format("Update config failed ~p", [Reason])),
|
||||||
|
{500, ?INTERNAL_ERROR, Message}
|
||||||
|
end.
|
||||||
|
|
||||||
generate_http_code_map(id_schema_error, Id) ->
|
generate_http_code_map(id_schema_error, Id) ->
|
||||||
#{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
|
#{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
|
||||||
|
@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) ->
|
||||||
generate_http_code_map(not_found, Id) ->
|
generate_http_code_map(not_found, Id) ->
|
||||||
#{code => ?MESSAGE_ID_NOT_FOUND, message =>
|
#{code => ?MESSAGE_ID_NOT_FOUND, message =>
|
||||||
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
|
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
|
||||||
|
|
||||||
rpc_call(Node, Module, Fun, Args) ->
|
|
||||||
case rpc:call(Node, Module, Fun, Args) of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Result -> Result
|
|
||||||
end.
|
|
||||||
|
|
|
@ -44,8 +44,15 @@ list() ->
|
||||||
|
|
||||||
update(Params) ->
|
update(Params) ->
|
||||||
disable(),
|
disable(),
|
||||||
{ok, _} = emqx:update_config([event_message], Params),
|
case emqx_conf:update([event_message],
|
||||||
enable().
|
Params,
|
||||||
|
#{rawconf_with_defaults => true, override_to => cluster}) of
|
||||||
|
{ok, #{raw_config := NewEventMessage}} ->
|
||||||
|
enable(),
|
||||||
|
{ok, NewEventMessage};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
enable() ->
|
enable() ->
|
||||||
lists:foreach(fun({_Topic, false}) -> ok;
|
lists:foreach(fun({_Topic, false}) -> ok;
|
||||||
|
|
|
@ -53,5 +53,10 @@ event_message(get, _Params) ->
|
||||||
{200, emqx_event_message:list()};
|
{200, emqx_event_message:list()};
|
||||||
|
|
||||||
event_message(put, #{body := Body}) ->
|
event_message(put, #{body := Body}) ->
|
||||||
_ = emqx_event_message:update(Body),
|
case emqx_event_message:update(Body) of
|
||||||
{200, emqx_event_message:list()}.
|
{ok, NewConfig} ->
|
||||||
|
{200, NewConfig};
|
||||||
|
{error, Reason} ->
|
||||||
|
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||||
|
{500, 'INTERNAL_ERROR', Message}
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue