Merge pull request #12266 from zhongwencool/redis-550

fix: remove redis_type from redis action
This commit is contained in:
zhongwencool 2024-01-09 09:34:52 +08:00 committed by GitHub
commit bebd6f7c57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 322 additions and 267 deletions

View File

@ -33,7 +33,8 @@
has_custom_bridge_v1_config_to_connector_config/1, has_custom_bridge_v1_config_to_connector_config/1,
bridge_v1_config_to_action_config/3, bridge_v1_config_to_action_config/3,
has_custom_bridge_v1_config_to_action_config/1, has_custom_bridge_v1_config_to_action_config/1,
transform_bridge_v1_config_to_action_config/4 transform_bridge_v1_config_to_action_config/4,
action_convert_from_connector/3
]). ]).
-callback bridge_v1_type_name() -> -callback bridge_v1_type_name() ->
@ -142,8 +143,10 @@ action_type_to_bridge_v1_type(ActionType, ActionConf) ->
get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
ConnectorType = action_type_to_connector_type(ActionType), ConnectorType = action_type_to_connector_type(ActionType),
ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]), case emqx_conf:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
{ConnectorConfig, ActionConfig}; undefined -> undefined;
ConnectorConfig -> {ConnectorConfig, ActionConfig}
end;
get_confs(_, _) -> get_confs(_, _) ->
undefined. undefined.
@ -188,6 +191,15 @@ connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig,
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig)
end. end.
action_convert_from_connector(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
Module = get_action_info_module(ActionOrBridgeType),
case erlang:function_exported(Module, action_convert_from_connector, 2) of
true ->
Module:action_convert_from_connector(ConnectorConfig, ActionConfig);
false ->
ActionConfig
end.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
Merged = emqx_utils_maps:deep_merge( Merged = emqx_utils_maps:deep_merge(
maps:without( maps:without(

View File

@ -73,6 +73,8 @@
get_channels_for_connector/1 get_channels_for_connector/1
]). ]).
-export([diff_confs/2]).
%% Exported for tests %% Exported for tests
-export([ -export([
id/2, id/2,
@ -97,7 +99,8 @@
-export([ -export([
bridge_v2_type_to_connector_type/1, bridge_v2_type_to_connector_type/1,
is_bridge_v2_type/1 is_bridge_v2_type/1,
connector_type/1
]). ]).
%% Compatibility Layer API %% Compatibility Layer API
@ -248,7 +251,7 @@ list() ->
{ok, emqx_config:update_result()} | {error, any()}. {ok, emqx_config:update_result()} | {error, any()}.
create(BridgeType, BridgeName, RawConf) -> create(BridgeType, BridgeName, RawConf) ->
?SLOG(debug, #{ ?SLOG(debug, #{
brige_action => create, bridge_action => create,
bridge_version => 2, bridge_version => 2,
bridge_type => BridgeType, bridge_type => BridgeType,
bridge_name => BridgeName, bridge_name => BridgeName,
@ -265,7 +268,7 @@ remove(BridgeType, BridgeName) ->
%% NOTE: This function can cause broken references from rules but it is only %% NOTE: This function can cause broken references from rules but it is only
%% called directly from test cases. %% called directly from test cases.
?SLOG(debug, #{ ?SLOG(debug, #{
brige_action => remove, bridge_action => remove,
bridge_version => 2, bridge_version => 2,
bridge_type => BridgeType, bridge_type => BridgeType,
bridge_name => BridgeName bridge_name => BridgeName
@ -356,7 +359,7 @@ install_bridge_v2_helper(
_BridgeName, _BridgeName,
{error, Reason} = Error {error, Reason} = Error
) -> ) ->
?SLOG(error, Reason), ?SLOG(warning, Reason),
Error; Error;
install_bridge_v2_helper( install_bridge_v2_helper(
BridgeV2Type, BridgeV2Type,
@ -412,7 +415,7 @@ uninstall_bridge_v2(
CreationOpts = emqx_resource:fetch_creation_opts(Config), CreationOpts = emqx_resource:fetch_creation_opts(Config),
ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
ok = emqx_resource:clear_metrics(BridgeV2Id), ok = emqx_resource:clear_metrics(BridgeV2Id),
case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of
{error, _} -> {error, _} ->
ok; ok;
ok -> ok ->
@ -440,9 +443,10 @@ combine_connector_and_bridge_v2_config(
BridgeV2Config#{resource_opts => CombinedCreationOpts} BridgeV2Config#{resource_opts => CombinedCreationOpts}
catch catch
_:_ -> _:_ ->
alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName),
{error, #{ {error, #{
reason => "connector_not_found", reason => <<"connector_not_found_or_wrong_type">>,
type => BridgeV2Type, bridge_type => BridgeV2Type,
bridge_name => BridgeName, bridge_name => BridgeName,
connector_name => ConnectorName connector_name => ConnectorName
}} }}
@ -451,15 +455,14 @@ combine_connector_and_bridge_v2_config(
%%==================================================================== %%====================================================================
%% Operations %% Operations
%%==================================================================== %%====================================================================
-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)).
-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> -spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
{ok, any()} | {error, any()}. {ok, any()} | {error, any()}.
disable_enable(Action, BridgeType, BridgeName) when disable_enable(Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) ->
Action =:= disable; Action =:= enable
->
emqx_conf:update( emqx_conf:update(
config_key_path() ++ [BridgeType, BridgeName], config_key_path() ++ [BridgeType, BridgeName],
{Action, BridgeType, BridgeName}, Action,
#{override_to => cluster} #{override_to => cluster}
). ).
@ -565,7 +568,7 @@ query(BridgeType, BridgeName, Message, QueryOpts0) ->
do_query_with_enabled_config( do_query_with_enabled_config(
_BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
) -> ) ->
?SLOG(error, Reason), ?SLOG(warning, Reason),
Error; Error;
do_query_with_enabled_config( do_query_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config BridgeType, BridgeName, Message, QueryOpts0, Config
@ -863,118 +866,83 @@ import_config(RawConf) ->
%% Config Update Handler API %% Config Update Handler API
%%==================================================================== %%====================================================================
config_key_path() -> config_key_path() -> [?ROOT_KEY].
[?ROOT_KEY].
config_key_path_leaf() -> config_key_path_leaf() -> [?ROOT_KEY, '?', '?'].
[?ROOT_KEY, '?', '?'].
pre_config_update(_, {force_update, Conf}, _OldConf) -> %% enable or disable action
{ok, Conf}; pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) ->
%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the
%% underlying resources.
pre_config_update(_, {_Oper, _, _}, undefined) ->
{error, bridge_not_found}; {error, bridge_not_found};
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) ->
%% to save the 'enable' to the config files {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}};
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; %% Updates a single action from a specific HTTP API.
pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) -> %% If the connector is not found, the update operation fails.
{ok, Conf}. pre_config_update([?ROOT_KEY, Type, Name], Conf = #{}, _OldConf) ->
action_convert_from_connector(Type, Name, Conf);
%% Batch updates actions when importing a configuration or executing a CLI command.
%% Update succeeded even if the connector is not found, alarm in post_config_update
pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) ->
{ok, actions_convert_from_connectors(Conf)}.
operation_to_enable(disable) -> false; %% Don't crash event the bridge is not found
operation_to_enable(enable) -> true. post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
AllBridges = emqx:get_config([?ROOT_KEY]),
%% A public API that can trigger this is: case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of
%% bin/emqx ctl conf load data/configs/cluster.hocon undefined ->
post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) -> ok;
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false}); Action ->
%% This top level handler will be triggered when the actions path is updated ok = uninstall_bridge_v2(Type, Name, Action),
%% with calls to emqx_conf:update([actions], BridgesConf, #{}). Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges),
%% reload_message_publish_hook(Bridges)
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> end,
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true}); ?tp(bridge_post_config_update_done, #{}),
post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> ok;
Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), %% Create a single bridge failed if the connector is not found(already check in pre_config_update)
ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])), ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
),
reload_message_publish_hook(Bridges), reload_message_publish_hook(Bridges),
?tp(bridge_post_config_update_done, #{}), ?tp(bridge_post_config_update_done, #{}),
ok; ok;
post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> %% update bridges failed if the connector is not found(already check in pre_config_update)
%% N.B.: all bridges must use the same field name (`connector`) to define the
%% connector name.
ConnectorName = maps:get(connector, NewConf),
case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
ok ->
ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
),
reload_message_publish_hook(Bridges),
?tp(bridge_post_config_update_done, #{}),
ok;
{error, Error} ->
{error, Error}
end;
post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
ConnectorName = maps:get(connector, NewConf), ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
ok -> Bridges = emqx_utils_maps:deep_put(
ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
ok = install_bridge_v2(BridgeType, BridgeName, NewConf), ),
Bridges = emqx_utils_maps:deep_put( reload_message_publish_hook(Bridges),
[BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf ?tp(bridge_post_config_update_done, #{}),
), ok;
reload_message_publish_hook(Bridges), %% This top level handler will be triggered when the actions path is updated
?tp(bridge_post_config_update_done, #{}), %% with calls to emqx_conf:update([actions], BridgesConf, #{}).
ok; %% such as import_config/1
{error, Error} -> %% Notice ** do succeeded even if the connector is not found **
{error, Error} %% Install a non-exist connector will alarm & log(warn) in install_bridge_v2.
end. post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) ->
#{added := Added, removed := Removed, changed := Updated} = #{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf), diff_confs(NewConf, OldConf),
UpdatedConfigs = %% The config update will be failed if any task in `perform_bridge_changes` failed.
lists:map( RemoveFun = fun uninstall_bridge_v2/3,
fun({{Type, BridgeName}, {_Old, New}}) -> CreateFun = fun install_bridge_v2/3,
{Type, BridgeName, New} UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
end, uninstall_bridge_v2(Type, Name, OldBridgeConf),
maps:to_list(Updated) install_bridge_v2(Type, Name, Conf)
), end,
AddedConfigs = Result = perform_bridge_changes([
lists:map( #{action => RemoveFun, data => Removed},
fun({{Type, BridgeName}, AddedConf}) -> #{
{Type, BridgeName, AddedConf} action => CreateFun,
end, data => Added,
maps:to_list(Added) on_exception_fn => fun emqx_bridge_resource:remove/4
), },
ToValidate = UpdatedConfigs ++ AddedConfigs, #{action => UpdateFun, data => Updated}
case multi_validate_referenced_connectors(NeedValidate, ToValidate) of ]),
ok -> reload_message_publish_hook(NewConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed. ?tp(bridge_post_config_update_done, #{}),
RemoveFun = fun uninstall_bridge_v2/3, Result.
CreateFun = fun install_bridge_v2/3,
UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
uninstall_bridge_v2(Type, Name, OldBridgeConf),
install_bridge_v2(Type, Name, Conf)
end,
Result = perform_bridge_changes([
#{action => RemoveFun, data => Removed},
#{
action => CreateFun,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => UpdateFun, data => Updated}
]),
ok = unload_message_publish_hook(),
ok = load_message_publish_hook(NewConf),
?tp(bridge_post_config_update_done, #{}),
Result;
{error, Error} ->
{error, Error}
end.
diff_confs(NewConfs, OldConfs) -> diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps( emqx_utils_maps:diff_maps(
@ -1066,12 +1034,10 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
#{connector := ConnectorName} -> #{connector := ConnectorName} ->
ConnectorType = connector_type(BridgeV2Type), ConnectorType = connector_type(BridgeV2Type),
ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
{ok, Channels} = emqx_resource:get_channels(ConnectorResourceId), case emqx_resource:get_channels(ConnectorResourceId) of
case Channels of {ok, [_Channel]} -> true;
[_Channel] -> %% not_found, [], [_|_]
true; _ -> false
_ ->
false
end end
end. end.
@ -1148,7 +1114,7 @@ bridge_v1_lookup_and_transform_helper(
BridgeV2Status = maps:get(status, Action, undefined), BridgeV2Status = maps:get(status, Action, undefined),
BridgeV2Error = maps:get(error, Action, undefined), BridgeV2Error = maps:get(error, Action, undefined),
ResourceData1 = maps:get(resource_data, BridgeV1, #{}), ResourceData1 = maps:get(resource_data, BridgeV1, #{}),
%% Replace id in resouce data %% Replace id in resource data
BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1), ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1),
ConnectorStatus = maps:get(status, ResourceData2, undefined), ConnectorStatus = maps:get(status, ResourceData2, undefined),
@ -1422,7 +1388,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
). ).
%% Bridge v1 delegated-removal in 3 steps: %% Bridge v1 delegated-removal in 3 steps:
%% 1. Delete rule actions if RmoveDeps has 'rule_actions' %% 1. Delete rule actions if RemoveDeps has 'rule_actions'
%% 2. Delete self (the bridge v2), also delete its channel in the connector %% 2. Delete self (the bridge v2), also delete its channel in the connector
%% 3. Delete the connector if the connector has no more channel left and if 'connector' is in RemoveDeps %% 3. Delete the connector if the connector has no more channel left and if 'connector' is in RemoveDeps
bridge_v1_check_deps_and_remove( bridge_v1_check_deps_and_remove(
@ -1557,6 +1523,9 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
%% Misc helper functions %% Misc helper functions
%%==================================================================== %%====================================================================
operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true.
bin(Bin) when is_binary(Bin) -> Bin; bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
@ -1575,50 +1544,85 @@ to_existing_atom(X) ->
{error, _} -> throw(bad_atom) {error, _} -> throw(bad_atom)
end. end.
validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) -> referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) ->
%% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
%% identical to its matching connector type name. %% identical to its matching connector type name.
try case get_connector_info(ConnectorNameBin, BridgeType) of
{ConnectorName, ConnectorType} = to_connector(ConnectorNameBin, BridgeType), {error, not_found} ->
case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
undefined ->
throw(not_found);
_ ->
ok
end
catch
throw:not_found ->
{error, #{ {error, #{
reason => "connector_not_found_or_wrong_type", reason => "connector_not_found_or_wrong_type",
connector_name => ConnectorNameBin, connector_name => ConnectorNameBin,
bridge_name => BridgeName, bridge_name => BridgeName,
bridge_type => BridgeType bridge_type => BridgeType
}};
{ok, _Connector} ->
ok
end.
actions_convert_from_connectors(Conf) ->
maps:map(
fun(ActionType, Actions) ->
maps:map(
fun(ActionName, Action) ->
case action_convert_from_connector(ActionType, ActionName, Action) of
{ok, NewAction} -> NewAction;
{error, _} -> Action
end
end,
Actions
)
end,
Conf
).
action_convert_from_connector(Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
case get_connector_info(ConnectorName, Type) of
{ok, Connector} ->
Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action),
{ok, Action1};
{error, not_found} ->
{error, #{
bridge_name => Name,
reason => <<"connector_not_found_or_wrong_type">>,
bridge_type => Type,
connector_name => ConnectorName
}} }}
end. end.
get_connector_info(ConnectorNameBin, BridgeType) ->
case to_connector(ConnectorNameBin, BridgeType) of
{error, not_found} ->
{error, not_found};
{ConnectorName, ConnectorType} ->
case emqx_config:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
undefined -> {error, not_found};
Connector -> {ok, Connector}
end
end.
to_connector(ConnectorNameBin, BridgeType) -> to_connector(ConnectorNameBin, BridgeType) ->
try try
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)),
ConnectorName = to_existing_atom(ConnectorNameBin), ConnectorName = to_existing_atom(ConnectorNameBin),
BridgeType1 = to_existing_atom(BridgeType),
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType1),
{ConnectorName, ConnectorType} {ConnectorName, ConnectorType}
catch catch
_:_ -> _:_ ->
throw(not_found) {error, not_found}
end. end.
multi_validate_referenced_connectors(false, _Configs) -> alarm_connector_not_found(ActionType, ActionName, ConnectorName) ->
ok; ConnectorType = connector_type(to_existing_atom(ActionType)),
multi_validate_referenced_connectors(true, Configs) -> ResId = emqx_connector_resource:resource_id(
Pipeline = ConnectorType, ConnectorName
lists:map( ),
fun({Type, BridgeName, #{connector := ConnectorName}}) -> _ = emqx_alarm:safe_activate(
fun(_) -> validate_referenced_connectors(Type, ConnectorName, BridgeName) end ResId,
end, #{
Configs connector_name => ConnectorName,
), connector_type => ConnectorType,
case emqx_utils:pipeline(Pipeline, unused, unused) of action_type => ActionType,
{ok, _, _} -> action_name => ActionName
ok; },
{error, Reason, _State} -> <<"connector not found">>
{error, Reason} ).
end.

View File

@ -56,6 +56,8 @@
project_to_actions_resource_opts/1 project_to_actions_resource_opts/1
]). ]).
-export([actions_convert_from_connectors/1]).
-export_type([action_type/0]). -export_type([action_type/0]).
%% Should we explicitly list them here so dialyzer may be more helpful? %% Should we explicitly list them here so dialyzer may be more helpful?
@ -305,6 +307,33 @@ project_to_actions_resource_opts(OldResourceOpts) ->
Subfields = common_resource_opts_subfields_bin(), Subfields = common_resource_opts_subfields_bin(),
maps:with(Subfields, OldResourceOpts). maps:with(Subfields, OldResourceOpts).
actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) ->
Actions1 =
maps:map(
fun(ActionType, ActionMap) ->
maps:map(
fun(_ActionName, Action) ->
#{<<"connector">> := ConnName} = Action,
ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)),
ConnPath = [<<"connectors">>, ConnType, ConnName],
case emqx_utils_maps:deep_find(ConnPath, RawConf) of
{ok, ConnConf} ->
emqx_action_info:action_convert_from_connector(
ActionType, ConnConf, Action
);
{not_found, _KeyPath, _Data} ->
Action
end
end,
ActionMap
)
end,
Actions
),
maps:put(<<"actions">>, Actions1, RawConf);
actions_convert_from_connectors(RawConf) ->
RawConf.
-ifdef(TEST). -ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl"). -include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() -> schema_homogeneous_test() ->

View File

@ -610,7 +610,7 @@ t_load_no_matching_connector(_Config) ->
bridge_name := my_test_bridge_update, bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>, connector_name := <<"unknown">>,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
update_root_config(RootConf0) update_root_config(RootConf0)
), ),
@ -627,7 +627,7 @@ t_load_no_matching_connector(_Config) ->
bridge_name := my_test_bridge_new, bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>, connector_name := <<"unknown">>,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
update_root_config(RootConf1) update_root_config(RootConf1)
), ),
@ -696,11 +696,11 @@ t_create_no_matching_connector(_Config) ->
Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>}, Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
), ),
@ -716,11 +716,11 @@ t_create_wrong_connector_type(_Config) ->
Conf = bridge_config(), Conf = bridge_config(),
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := wrong_type, bridge_type := wrong_type,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
), ),
@ -732,11 +732,11 @@ t_update_connector_not_found(_Config) ->
BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>}, BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
), ),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -105,8 +105,12 @@ fields(action_parameters) ->
command_template(), command_template(),
{redis_type, {redis_type,
?HOCON( ?HOCON(
?ENUM([single, sentinel, cluster]), ?ENUM([single, sentinel, cluster]), #{
#{required => true, desc => ?DESC(redis_type)} required => false,
desc => ?DESC(redis_type),
hidden => true,
importance => ?IMPORTANCE_HIDDEN
}
)} )}
]; ];
fields("post_single") -> fields("post_single") ->

View File

@ -14,7 +14,8 @@
connector_action_config_to_bridge_v1_config/2, connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2, bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1, bridge_v1_config_to_connector_config/1,
bridge_v1_type_name_fun/1 bridge_v1_type_name_fun/1,
action_convert_from_connector/2
]). ]).
-import(emqx_utils_conv, [bin/1]). -import(emqx_utils_conv, [bin/1]).
@ -48,6 +49,23 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
end, end,
maps:without([<<"description">>], Config1). maps:without([<<"description">>], Config1).
action_convert_from_connector(ConnectorConfig, ActionConfig) ->
case ConnectorConfig of
#{<<"parameters">> := #{<<"redis_type">> := <<"redis_cluster">>}} ->
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun(Opts) ->
Opts#{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>
}
end,
ActionConfig
);
_ ->
ActionConfig
end.
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),

View File

@ -13,8 +13,7 @@
namespace/0, namespace/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1, desc/1
resource_opts_converter/2
]). ]).
%% `emqx_bridge_v2_schema' "unofficial" API %% `emqx_bridge_v2_schema' "unofficial" API
@ -60,7 +59,6 @@ fields(action) ->
?MAP(name, ?R_REF(redis_action)), ?MAP(name, ?R_REF(redis_action)),
#{ #{
desc => <<"Redis Action Config">>, desc => <<"Redis Action Config">>,
converter => fun ?MODULE:resource_opts_converter/2,
required => false required => false
} }
)}; )};
@ -118,26 +116,6 @@ desc(action_resource_opts) ->
desc(_Name) -> desc(_Name) ->
undefined. undefined.
resource_opts_converter(undefined, _Opts) ->
undefined;
resource_opts_converter(Conf, _Opts) ->
maps:map(
fun(_Name, SubConf) ->
case SubConf of
#{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} ->
ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
%% cluster don't support batch
SubConf#{
<<"resource_opts">> =>
ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
};
_ ->
SubConf
end
end,
Conf
).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API %% `emqx_bridge_v2_schema' "unofficial" API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -145,21 +123,9 @@ resource_opts_converter(Conf, _Opts) ->
bridge_v2_examples(Method) -> bridge_v2_examples(Method) ->
[ [
#{ #{
<<"redis_single_producer">> => #{ <<"redis">> => #{
summary => <<"Redis Single Producer Action">>, summary => <<"Redis Action">>,
value => action_example(single, Method) value => action_example(Method)
}
},
#{
<<"redis_sentinel_producer">> => #{
summary => <<"Redis Sentinel Producer Action">>,
value => action_example(sentinel, Method)
}
},
#{
<<"redis_cluster_producer">> => #{
summary => <<"Redis Cluster Producer Action">>,
value => action_example(cluster, Method)
} }
} }
]. ].
@ -189,17 +155,17 @@ connector_examples(Method) ->
conn_bridge_examples(Method) -> conn_bridge_examples(Method) ->
emqx_bridge_redis:conn_bridge_examples(Method). emqx_bridge_redis:conn_bridge_examples(Method).
action_example(RedisType, post) -> action_example(post) ->
maps:merge( maps:merge(
action_example(RedisType, put), action_example(put),
#{ #{
type => <<"redis">>, type => <<"redis">>,
name => <<"my_action">> name => <<"my_action">>
} }
); );
action_example(RedisType, get) -> action_example(get) ->
maps:merge( maps:merge(
action_example(RedisType, put), action_example(put),
#{ #{
status => <<"connected">>, status => <<"connected">>,
node_status => [ node_status => [
@ -210,14 +176,13 @@ action_example(RedisType, get) ->
] ]
} }
); );
action_example(RedisType, put) -> action_example(put) ->
#{ #{
enable => true, enable => true,
connector => <<"my_connector_name">>, connector => <<"my_connector_name">>,
description => <<"My action">>, description => <<"My action">>,
parameters => #{ parameters => #{
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
redis_type => RedisType
}, },
resource_opts => #{batch_size => 1} resource_opts => #{batch_size => 1}
}. }.
@ -248,20 +213,33 @@ connector_example(RedisType, put) ->
enable => true, enable => true,
description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
parameters => connector_parameter(RedisType), parameters => connector_parameter(RedisType),
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>,
ssl => #{enable => false} ssl => #{enable => false}
}. }.
connector_parameter(single) -> connector_parameter(single) ->
#{redis_type => single, server => <<"127.0.0.1:6379">>}; #{
redis_type => single,
server => <<"127.0.0.1:6379">>,
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>
};
connector_parameter(cluster) -> connector_parameter(cluster) ->
#{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; #{
redis_type => cluster,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
pool_size => 8,
username => <<"test">>,
password => <<"******">>
};
connector_parameter(sentinel) -> connector_parameter(sentinel) ->
#{ #{
redis_type => sentinel, redis_type => sentinel,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
sentinel => <<"myredismaster">> sentinel => <<"myredismaster">>,
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>
}. }.

View File

@ -239,6 +239,11 @@ load_config_from_raw(RawConf0, Opts) ->
RawConf = emqx_config:fill_defaults(RawConf1), RawConf = emqx_config:fill_defaults(RawConf1),
case check_config(RawConf) of case check_config(RawConf) of
ok -> ok ->
%% It has been ensured that the connector is always the first configuration to be updated.
%% However, when deleting the connector, we need to clean up the dependent actions first;
%% otherwise, the deletion will fail.
%% notice: we can't create a action before connector.
uninstall_actions(RawConf, Opts),
Error = Error =
lists:filtermap( lists:filtermap(
fun({K, V}) -> fun({K, V}) ->
@ -272,6 +277,29 @@ load_config_from_raw(RawConf0, Opts) ->
{error, Errors} {error, Errors}
end. end.
uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) ->
Old = emqx_conf:get_raw([<<"actions">>], #{}),
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
maps:foreach(
fun({Type, Name}, _) ->
case emqx_bridge_v2:remove(Type, Name) of
ok ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_remove_action",
type => Type,
name => Name,
error => Reason
})
end
end,
Removed
);
%% we don't delete things when in merge mode or without actions key.
uninstall_actions(_RawConf, _) ->
ok.
update_config_cluster( update_config_cluster(
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
Conf, Conf,
@ -286,16 +314,9 @@ update_config_cluster(
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf), Merged = merge_conf(Key, NewConf),
Request = make_request(Key, Merged), check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts);
update_config_cluster(Key, Value, #{mode := replace} = Opts) -> update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
Request = make_request(Key, Value), check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts).
make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> ->
{force_update, Value};
make_request(_Key, Value) ->
Value.
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
update_config_local( update_config_local(
@ -312,11 +333,9 @@ update_config_local(
check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts);
update_config_local(Key, NewConf, #{mode := merge} = Opts) -> update_config_local(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf), Merged = merge_conf(Key, NewConf),
Request = make_request(Key, Merged), check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts);
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts);
update_config_local(Key, Value, #{mode := replace} = Opts) -> update_config_local(Key, Value, #{mode := replace} = Opts) ->
Request = make_request(Key, Value), check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts).
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts).
check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts).
check_res(Node, Key, {ok, _}, _Conf, Opts) -> check_res(Node, Key, {ok, _}, _Conf, Opts) ->
@ -442,6 +461,7 @@ filter_readonly_config(Raw) ->
end. end.
reload_config(AllConf, Opts) -> reload_config(AllConf, Opts) ->
uninstall_actions(AllConf, Opts),
Fold = fun({Key, Conf}, Acc) -> Fold = fun({Key, Conf}, Acc) ->
case update_config_local(Key, Conf, Opts) of case update_config_local(Key, Conf, Opts) of
ok -> ok ->

View File

@ -76,8 +76,9 @@
-define(DEFAULT_MAX_PORTS, 1024 * 1024). -define(DEFAULT_MAX_PORTS, 1024 * 1024).
%% Callback to upgrade config after loaded from config file but before validation. %% Callback to upgrade config after loaded from config file but before validation.
upgrade_raw_conf(RawConf) -> upgrade_raw_conf(Raw0) ->
emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf). Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1).
namespace() -> emqx. namespace() -> emqx.

View File

@ -50,6 +50,7 @@
]). ]).
-define(ROOT_KEY, connectors). -define(ROOT_KEY, connectors).
-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)).
load() -> load() ->
Connectors = emqx:get_config([?ROOT_KEY], #{}), Connectors = emqx:get_config([?ROOT_KEY], #{}),
@ -107,22 +108,22 @@ config_key_path() ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) -> pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf}; {ok, RawConf};
pre_config_update([?ROOT_KEY], {force_update, NewConf}, RawConf) ->
pre_config_update([?ROOT_KEY], NewConf, RawConf);
pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
case multi_validate_connector_names(NewConf) of case multi_validate_connector_names(NewConf) of
ok -> ok -> {ok, convert_certs(NewConf)};
{ok, convert_certs(NewConf)}; Error -> Error
Error ->
Error
end; end;
pre_config_update(_, {_Oper, _, _}, undefined) -> pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when
?ENABLE_OR_DISABLE(Oper)
->
{error, connector_not_found}; {error, connector_not_found};
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) when
?ENABLE_OR_DISABLE(Oper)
->
%% to save the 'enable' to the config files %% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) ->
case validate_connector_name_in_config(Path) of case validate_connector_name(Name) of
ok -> ok ->
case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
{error, Reason} -> {error, Reason} ->
@ -137,18 +138,11 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
operation_to_enable(disable) -> false; operation_to_enable(disable) -> false;
operation_to_enable(enable) -> true. operation_to_enable(enable) -> true.
post_config_update([?ROOT_KEY], {force_update, _}, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
perform_connector_changes(Removed, Added, Updated);
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} = #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf),
diff_confs(NewConf, OldConf),
case ensure_no_channels(Removed) of case ensure_no_channels(Removed) of
ok -> ok -> perform_connector_changes(Removed, Added, Updated);
perform_connector_changes(Removed, Added, Updated); {error, Error} -> {error, Error}
{error, Error} ->
{error, Error}
end; end;
post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
case emqx_connector_resource:get_channels(Type, Name) of case emqx_connector_resource:get_channels(Type, Name) of
@ -159,11 +153,13 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
{ok, Channels} -> {ok, Channels} ->
{error, {active_channels, Channels}} {error, {active_channels, Channels}}
end; end;
%% create a new connector
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) ->
ResOpts = emqx_resource:fetch_creation_opts(NewConf), ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts), ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts),
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
ok; ok;
%% update an existing connector
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ResOpts = emqx_resource:fetch_creation_opts(NewConf), ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts), ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts),
@ -226,12 +222,10 @@ lookup(Type, Name, RawConf) ->
get_metrics(Type, Name) -> get_metrics(Type, Name) ->
emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)). emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)).
disable_enable(Action, ConnectorType, ConnectorName) when disable_enable(Action, ConnectorType, ConnectorName) when ?ENABLE_OR_DISABLE(Action) ->
Action =:= disable; Action =:= enable
->
emqx_conf:update( emqx_conf:update(
config_key_path() ++ [ConnectorType, ConnectorName], config_key_path() ++ [ConnectorType, ConnectorName],
{Action, ConnectorType, ConnectorName}, Action,
#{override_to => cluster} #{override_to => cluster}
). ).
@ -250,7 +244,7 @@ create(ConnectorType, ConnectorName, RawConf) ->
remove(ConnectorType, ConnectorName) -> remove(ConnectorType, ConnectorName) ->
?SLOG(debug, #{ ?SLOG(debug, #{
brige_action => remove, bridge_action => remove,
connector_type => ConnectorType, connector_type => ConnectorType,
connector_name => ConnectorName connector_name => ConnectorName
}), }),
@ -293,6 +287,7 @@ import_config(RawConf) ->
ConnectorsConf = maps:get(<<"connectors">>, RawConf, #{}), ConnectorsConf = maps:get(<<"connectors">>, RawConf, #{}),
OldConnectorsConf = emqx:get_raw_config(RootKeyPath, #{}), OldConnectorsConf = emqx:get_raw_config(RootKeyPath, #{}),
MergedConf = merge_confs(OldConnectorsConf, ConnectorsConf), MergedConf = merge_confs(OldConnectorsConf, ConnectorsConf),
%% using merge strategy, deletions should not be performed within the post_config_update/5.
case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} -> {ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldConnectorsConf, NewRawConf)}}; {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldConnectorsConf, NewRawConf)}};
@ -490,14 +485,6 @@ validate_connector_name(ConnectorName) ->
{error, Error} {error, Error}
end. end.
validate_connector_name_in_config(Path) ->
case Path of
[?ROOT_KEY, _ConnectorType, ConnectorName] ->
validate_connector_name(ConnectorName);
_ ->
ok
end.
multi_validate_connector_names(Conf) -> multi_validate_connector_names(Conf) ->
ConnectorTypeAndNames = ConnectorTypeAndNames =
[ [

View File

@ -136,14 +136,16 @@ create(Type, Name, Conf0, Opts) ->
config => emqx_utils:redact(Conf0) config => emqx_utils:redact(Conf0)
}), }),
TypeBin = bin(Type), TypeBin = bin(Type),
ResourceId = resource_id(Type, Name),
Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name), ResourceId,
<<"emqx_connector">>, <<"emqx_connector">>,
?MODULE:connector_to_resource_type(Type), ?MODULE:connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf), parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, Opts) parse_opts(Conf, Opts)
), ),
_ = emqx_alarm:ensure_deactivated(ResourceId),
ok. ok.
update(ConnectorId, {OldConf, Conf}) -> update(ConnectorId, {OldConf, Conf}) ->