diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl index cf68c20e6..952d53b5e 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -12,6 +12,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-import(emqx_config_SUITE, [prepare_conf_file/3]). -import(emqx_bridge_rabbitmq_test_utils, [ rabbit_mq_exchange/0, @@ -317,6 +318,60 @@ t_action_not_exist_exchange(_Config) -> ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), ok. +t_replace_action_source(Config) -> + Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action()}}, + Source = #{<<"rabbitmq">> => #{<<"my_source">> => rabbitmq_source()}}, + ConnectorName = atom_to_binary(?MODULE), + Connector = #{<<"rabbitmq">> => #{ConnectorName => rabbitmq_connector(get_rabbitmq(Config))}}, + Rabbitmq = #{ + <<"actions">> => Action, + <<"sources">> => Source, + <<"connectors">> => Connector + }, + ConfBin0 = hocon_pp:do(Rabbitmq, #{}), + ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config), + ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile0])), + ?assertMatch( + #{<<"rabbitmq">> := #{<<"my_action">> := _}}, + emqx_config:get_raw([<<"actions">>]), + Action + ), + ?assertMatch( + #{<<"rabbitmq">> := #{<<"my_source">> := _}}, + emqx_config:get_raw([<<"sources">>]), + Source + ), + ?assertMatch( + #{<<"rabbitmq">> := #{ConnectorName := _}}, + emqx_config:get_raw([<<"connectors">>]), + Connector + ), + + Empty = #{ + <<"actions">> => #{}, + <<"sources">> => #{}, + <<"connectors">> => #{} + }, + ConfBin1 = hocon_pp:do(Empty, #{}), + ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config), + ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile1])), + + ?assertEqual(#{}, emqx_config:get_raw([<<"actions">>])), + ?assertEqual(#{}, emqx_config:get_raw([<<"sources">>])), + ?assertMatch(#{}, emqx_config:get_raw([<<"connectors">>])), + + %% restore connectors + Rabbitmq2 = #{<<"connectors">> => Connector}, + ConfBin2 = hocon_pp:do(Rabbitmq2, #{}), + ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config), + ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile2])), + ?assertMatch( + #{<<"rabbitmq">> := #{ConnectorName := _}}, + emqx_config:get_raw([<<"connectors">>]), + Connector + ), + ok. + waiting_for_disconnected_alarms(InstanceId) -> waiting_for_disconnected_alarms(InstanceId, 0). diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index c50495b3e..d6462a0b6 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -245,10 +245,11 @@ load_config_from_raw(RawConf0, Opts) -> case check_config(RawConf1) of {ok, RawConf} -> %% It has been ensured that the connector is always the first configuration to be updated. - %% However, when deleting the connector, we need to clean up the dependent actions first; + %% However, when deleting the connector, we need to clean up the dependent actions/sources first; %% otherwise, the deletion will fail. - %% notice: we can't create a action before connector. - uninstall_actions(RawConf, Opts), + %% notice: we can't create a action/sources before connector. + uninstall(<<"actions">>, RawConf, Opts), + uninstall(<<"sources">>, RawConf, Opts), Error = lists:filtermap( fun({K, V}) -> @@ -288,27 +289,33 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. -uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) -> - Old = emqx_conf:get_raw([<<"actions">>], #{}), - #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), - maps:foreach( - fun({Type, Name}, _) -> - case emqx_bridge_v2:remove(Type, Name) of - ok -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_remove_action", - type => Type, - name => Name, - error => Reason - }) - end - end, - Removed - ); -%% we don't delete things when in merge mode or without actions key. -uninstall_actions(_RawConf, _) -> +uninstall(ActionOrSource, Conf, #{mode := replace}) -> + case maps:find(ActionOrSource, Conf) of + {ok, New} -> + Old = emqx_conf:get_raw([ActionOrSource], #{}), + ActionOrSourceAtom = binary_to_existing_atom(ActionOrSource), + #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), + maps:foreach( + fun({Type, Name}, _) -> + case emqx_bridge_v2:remove(ActionOrSourceAtom, Type, Name) of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_remove", + type => Type, + name => Name, + error => Reason + }) + end + end, + Removed + ); + error -> + ok + end; +%% we don't delete things when in merge mode or without actions/sources key. +uninstall(_, _RawConf, _) -> ok. update_config_cluster( @@ -481,7 +488,8 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> - uninstall_actions(AllConf, Opts), + uninstall(<<"actions">>, AllConf, Opts), + uninstall(<<"sources">>, AllConf, Opts), Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, Opts) of ok -> diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index bf9a960d5..159e05f9b 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -473,6 +473,8 @@ ensure_no_channels(Configs) -> fun({Type, ConnectorName}) -> fun(_) -> case emqx_connector_resource:get_channels(Type, ConnectorName) of + {error, not_found} -> + ok; {ok, []} -> ok; {ok, Channels} -> diff --git a/changes/ce/fix-12715.en.md b/changes/ce/fix-12715.en.md new file mode 100644 index 000000000..ed3ec38fc --- /dev/null +++ b/changes/ce/fix-12715.en.md @@ -0,0 +1 @@ +Fix replacing sources crash if connector has active channels