feat: add update/3

This commit is contained in:
Stefan Strigler 2023-10-03 15:22:47 +02:00 committed by Zaiming (Stone) Shi
parent 9195838c17
commit cc864f4821
2 changed files with 35 additions and 19 deletions

View File

@ -27,7 +27,6 @@
]). ]).
-export([ -export([
check_deps_and_remove/3,
create/3, create/3,
disable_enable/3, disable_enable/3,
get_metrics/2, get_metrics/2,
@ -36,7 +35,8 @@
lookup/1, lookup/1,
lookup/2, lookup/2,
remove/2, remove/2,
unload/0 unload/0,
update/3
]). ]).
-export([config_key_path/0]). -export([config_key_path/0]).
@ -148,7 +148,7 @@ post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs)
ok = emqx_connector_resource:create(Type, Name, NewConf), ok = emqx_connector_resource:create(Type, Name, NewConf),
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
ok; ok;
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ResOpts = emqx_resource:fetch_creation_opts(NewConf), ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts), ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts),
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
@ -229,22 +229,22 @@ remove(ConnectorType, ConnectorName) ->
#{override_to => cluster} #{override_to => cluster}
). ).
check_deps_and_remove(ConnectorType, ConnectorName, RemoveDeps) -> update(ConnectorType, ConnectorName, RawConf) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName), ?SLOG(debug, #{
%% NOTE: This violates the design: Rule depends on data-connector but not vice versa. connector_action => update,
case emqx_rule_engine:get_rule_ids_by_action(ConnectorId) of connector_type => ConnectorType,
[] -> connector_name => ConnectorName,
remove(ConnectorType, ConnectorName); connector_raw_config => emqx_utils:redact(RawConf)
RuleIds when RemoveDeps =:= false -> }),
{error, {rules_deps_on_this_connector, RuleIds}}; case lookup(ConnectorType, ConnectorName) of
RuleIds when RemoveDeps =:= true -> {ok, _Conf} ->
lists:foreach( emqx_conf:update(
fun(R) -> emqx_connector:config_key_path() ++ [ConnectorType, ConnectorName],
emqx_rule_engine:ensure_action_removed(R, ConnectorId) RawConf,
end, #{override_to => cluster}
RuleIds );
), Error ->
remove(ConnectorType, ConnectorName) Error
end. end.
%%---------------------------------------------------------------------------------------- %%----------------------------------------------------------------------------------------

View File

@ -105,6 +105,18 @@ t_connector_lifecycle(_Config) ->
emqx_connector:lookup(kafka, my_connector) emqx_connector:lookup(kafka, my_connector)
), ),
?assertMatch(
{ok, #{config := #{connect_timeout := 10000}}},
emqx_connector:update(kafka, my_connector, (connector_config())#{
<<"connect_timeout">> => <<"10s">>
})
),
?assertMatch(
{ok, #{resource_data := #{config := #{connect_timeout := 10000}}}},
emqx_connector:lookup(kafka, my_connector)
),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
emqx_connector:remove(kafka, my_connector) emqx_connector:remove(kafka, my_connector)
@ -125,6 +137,10 @@ t_connector_lifecycle(_Config) ->
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}, {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}}, {_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected}, {_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
{_, {?CONNECTOR, callback_mode, []}, _},
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok} {_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}
], ],
meck:history(?CONNECTOR) meck:history(?CONNECTOR)