Merge pull request #12715 from zhongwencool/fix-source-replace-error
fix: cant replace source conf
This commit is contained in:
commit
24c04e715d
|
@ -12,6 +12,7 @@
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("stdlib/include/assert.hrl").
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
-import(emqx_config_SUITE, [prepare_conf_file/3]).
|
||||||
|
|
||||||
-import(emqx_bridge_rabbitmq_test_utils, [
|
-import(emqx_bridge_rabbitmq_test_utils, [
|
||||||
rabbit_mq_exchange/0,
|
rabbit_mq_exchange/0,
|
||||||
|
@ -317,6 +318,60 @@ t_action_not_exist_exchange(_Config) ->
|
||||||
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
|
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_replace_action_source(Config) ->
|
||||||
|
Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action()}},
|
||||||
|
Source = #{<<"rabbitmq">> => #{<<"my_source">> => rabbitmq_source()}},
|
||||||
|
ConnectorName = atom_to_binary(?MODULE),
|
||||||
|
Connector = #{<<"rabbitmq">> => #{ConnectorName => rabbitmq_connector(get_rabbitmq(Config))}},
|
||||||
|
Rabbitmq = #{
|
||||||
|
<<"actions">> => Action,
|
||||||
|
<<"sources">> => Source,
|
||||||
|
<<"connectors">> => Connector
|
||||||
|
},
|
||||||
|
ConfBin0 = hocon_pp:do(Rabbitmq, #{}),
|
||||||
|
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
|
||||||
|
?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile0])),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"rabbitmq">> := #{<<"my_action">> := _}},
|
||||||
|
emqx_config:get_raw([<<"actions">>]),
|
||||||
|
Action
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"rabbitmq">> := #{<<"my_source">> := _}},
|
||||||
|
emqx_config:get_raw([<<"sources">>]),
|
||||||
|
Source
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"rabbitmq">> := #{ConnectorName := _}},
|
||||||
|
emqx_config:get_raw([<<"connectors">>]),
|
||||||
|
Connector
|
||||||
|
),
|
||||||
|
|
||||||
|
Empty = #{
|
||||||
|
<<"actions">> => #{},
|
||||||
|
<<"sources">> => #{},
|
||||||
|
<<"connectors">> => #{}
|
||||||
|
},
|
||||||
|
ConfBin1 = hocon_pp:do(Empty, #{}),
|
||||||
|
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
|
||||||
|
?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile1])),
|
||||||
|
|
||||||
|
?assertEqual(#{}, emqx_config:get_raw([<<"actions">>])),
|
||||||
|
?assertEqual(#{}, emqx_config:get_raw([<<"sources">>])),
|
||||||
|
?assertMatch(#{}, emqx_config:get_raw([<<"connectors">>])),
|
||||||
|
|
||||||
|
%% restore connectors
|
||||||
|
Rabbitmq2 = #{<<"connectors">> => Connector},
|
||||||
|
ConfBin2 = hocon_pp:do(Rabbitmq2, #{}),
|
||||||
|
ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config),
|
||||||
|
?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile2])),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"rabbitmq">> := #{ConnectorName := _}},
|
||||||
|
emqx_config:get_raw([<<"connectors">>]),
|
||||||
|
Connector
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
waiting_for_disconnected_alarms(InstanceId) ->
|
waiting_for_disconnected_alarms(InstanceId) ->
|
||||||
waiting_for_disconnected_alarms(InstanceId, 0).
|
waiting_for_disconnected_alarms(InstanceId, 0).
|
||||||
|
|
||||||
|
|
|
@ -245,10 +245,11 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
case check_config(RawConf1) of
|
case check_config(RawConf1) of
|
||||||
{ok, RawConf} ->
|
{ok, RawConf} ->
|
||||||
%% It has been ensured that the connector is always the first configuration to be updated.
|
%% It has been ensured that the connector is always the first configuration to be updated.
|
||||||
%% However, when deleting the connector, we need to clean up the dependent actions first;
|
%% However, when deleting the connector, we need to clean up the dependent actions/sources first;
|
||||||
%% otherwise, the deletion will fail.
|
%% otherwise, the deletion will fail.
|
||||||
%% notice: we can't create a action before connector.
|
%% notice: we can't create a action/sources before connector.
|
||||||
uninstall_actions(RawConf, Opts),
|
uninstall(<<"actions">>, RawConf, Opts),
|
||||||
|
uninstall(<<"sources">>, RawConf, Opts),
|
||||||
Error =
|
Error =
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun({K, V}) ->
|
fun({K, V}) ->
|
||||||
|
@ -288,27 +289,33 @@ load_config_from_raw(RawConf0, Opts) ->
|
||||||
{error, Errors}
|
{error, Errors}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) ->
|
uninstall(ActionOrSource, Conf, #{mode := replace}) ->
|
||||||
Old = emqx_conf:get_raw([<<"actions">>], #{}),
|
case maps:find(ActionOrSource, Conf) of
|
||||||
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
|
{ok, New} ->
|
||||||
maps:foreach(
|
Old = emqx_conf:get_raw([ActionOrSource], #{}),
|
||||||
fun({Type, Name}, _) ->
|
ActionOrSourceAtom = binary_to_existing_atom(ActionOrSource),
|
||||||
case emqx_bridge_v2:remove(Type, Name) of
|
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
|
||||||
ok ->
|
maps:foreach(
|
||||||
ok;
|
fun({Type, Name}, _) ->
|
||||||
{error, Reason} ->
|
case emqx_bridge_v2:remove(ActionOrSourceAtom, Type, Name) of
|
||||||
?SLOG(error, #{
|
ok ->
|
||||||
msg => "failed_to_remove_action",
|
ok;
|
||||||
type => Type,
|
{error, Reason} ->
|
||||||
name => Name,
|
?SLOG(error, #{
|
||||||
error => Reason
|
msg => "failed_to_remove",
|
||||||
})
|
type => Type,
|
||||||
end
|
name => Name,
|
||||||
end,
|
error => Reason
|
||||||
Removed
|
})
|
||||||
);
|
end
|
||||||
%% we don't delete things when in merge mode or without actions key.
|
end,
|
||||||
uninstall_actions(_RawConf, _) ->
|
Removed
|
||||||
|
);
|
||||||
|
error ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
%% we don't delete things when in merge mode or without actions/sources key.
|
||||||
|
uninstall(_, _RawConf, _) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
update_config_cluster(
|
update_config_cluster(
|
||||||
|
@ -481,7 +488,8 @@ filter_readonly_config(Raw) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reload_config(AllConf, Opts) ->
|
reload_config(AllConf, Opts) ->
|
||||||
uninstall_actions(AllConf, Opts),
|
uninstall(<<"actions">>, AllConf, Opts),
|
||||||
|
uninstall(<<"sources">>, AllConf, Opts),
|
||||||
Fold = fun({Key, Conf}, Acc) ->
|
Fold = fun({Key, Conf}, Acc) ->
|
||||||
case update_config_local(Key, Conf, Opts) of
|
case update_config_local(Key, Conf, Opts) of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
@ -473,6 +473,8 @@ ensure_no_channels(Configs) ->
|
||||||
fun({Type, ConnectorName}) ->
|
fun({Type, ConnectorName}) ->
|
||||||
fun(_) ->
|
fun(_) ->
|
||||||
case emqx_connector_resource:get_channels(Type, ConnectorName) of
|
case emqx_connector_resource:get_channels(Type, ConnectorName) of
|
||||||
|
{error, not_found} ->
|
||||||
|
ok;
|
||||||
{ok, []} ->
|
{ok, []} ->
|
||||||
ok;
|
ok;
|
||||||
{ok, Channels} ->
|
{ok, Channels} ->
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix replacing sources crash if connector has active channels
|
Loading…
Reference in New Issue