From 3404f39fd25b920183b04eacdf315410ea2d7f54 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 3 Jan 2024 09:53:18 +0800 Subject: [PATCH 1/2] chore: remove redundant redis action swagger example --- apps/emqx_bridge/src/emqx_action_info.erl | 18 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 316 ++++++++++-------- .../src/schema/emqx_bridge_v2_schema.erl | 19 ++ .../src/emqx_bridge_redis.app.src | 2 +- .../src/emqx_bridge_redis_action_info.erl | 20 +- .../src/emqx_bridge_redis_schema.erl | 16 +- apps/emqx_conf/src/emqx_conf_schema.erl | 6 +- apps/emqx_connector/src/emqx_connector.erl | 57 ++-- .../src/emqx_connector_resource.erl | 4 +- 9 files changed, 254 insertions(+), 204 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 1f2a3d1f3..ceba8e202 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -33,7 +33,8 @@ has_custom_bridge_v1_config_to_connector_config/1, bridge_v1_config_to_action_config/3, has_custom_bridge_v1_config_to_action_config/1, - transform_bridge_v1_config_to_action_config/4 + transform_bridge_v1_config_to_action_config/4, + action_convert_from_connector/3 ]). -callback bridge_v1_type_name() -> @@ -142,8 +143,10 @@ action_type_to_bridge_v1_type(ActionType, ActionConf) -> get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) -> ConnectorType = action_type_to_connector_type(ActionType), - ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]), - {ConnectorConfig, ActionConfig}; + case emqx_conf:get_raw([connectors, ConnectorType, ConnectorName], undefined) of + undefined -> undefined; + ConnectorConfig -> {ConnectorConfig, ActionConfig} + end; get_confs(_, _) -> undefined. @@ -188,6 +191,15 @@ connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) end. +action_convert_from_connector(ActionOrBridgeType, ConnectorConfig, ActionConfig) -> + Module = get_action_info_module(ActionOrBridgeType), + case erlang:function_exported(Module, action_convert_from_connector, 2) of + true -> + Module:action_convert_from_connector(ConnectorConfig, ActionConfig); + false -> + ActionConfig + end. + connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> Merged = emqx_utils_maps:deep_merge( maps:without( diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index cd6172eda..455bee5eb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -73,6 +73,8 @@ get_channels_for_connector/1 ]). +-export([diff_confs/2]). + %% Exported for tests -export([ id/2, @@ -97,7 +99,8 @@ -export([ bridge_v2_type_to_connector_type/1, - is_bridge_v2_type/1 + is_bridge_v2_type/1, + connector_type/1 ]). %% Compatibility Layer API @@ -148,12 +151,23 @@ %%==================================================================== load() -> + reinit_bridges(), load_bridges(), load_message_publish_hook(), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok. +reinit_bridges() -> + %% todo + Bridges = emqx:get_raw_config([?ROOT_KEY], #{}), + try + actions_convert_from_connectors(Bridges) + catch + _:_ -> ok + end, + ok. + load_bridges() -> Bridges = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( @@ -248,7 +262,7 @@ list() -> {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ - brige_action => create, + bridge_action => create, bridge_version => 2, bridge_type => BridgeType, bridge_name => BridgeName, @@ -265,7 +279,7 @@ remove(BridgeType, BridgeName) -> %% NOTE: This function can cause broken references from rules but it is only %% called directly from test cases. ?SLOG(debug, #{ - brige_action => remove, + bridge_action => remove, bridge_version => 2, bridge_type => BridgeType, bridge_name => BridgeName @@ -356,7 +370,7 @@ install_bridge_v2_helper( _BridgeName, {error, Reason} = Error ) -> - ?SLOG(error, Reason), + ?SLOG(warning, Reason), Error; install_bridge_v2_helper( BridgeV2Type, @@ -412,7 +426,7 @@ uninstall_bridge_v2( CreationOpts = emqx_resource:fetch_creation_opts(Config), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource:clear_metrics(BridgeV2Id), - case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of + case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of {error, _} -> ok; ok -> @@ -440,6 +454,7 @@ combine_connector_and_bridge_v2_config( BridgeV2Config#{resource_opts => CombinedCreationOpts} catch _:_ -> + alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName), {error, #{ reason => "connector_not_found", type => BridgeV2Type, @@ -451,15 +466,14 @@ combine_connector_and_bridge_v2_config( %%==================================================================== %% Operations %%==================================================================== +-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)). -spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> {ok, any()} | {error, any()}. -disable_enable(Action, BridgeType, BridgeName) when - Action =:= disable; Action =:= enable --> +disable_enable(Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) -> emqx_conf:update( config_key_path() ++ [BridgeType, BridgeName], - {Action, BridgeType, BridgeName}, + Action, #{override_to => cluster} ). @@ -565,7 +579,7 @@ query(BridgeType, BridgeName, Message, QueryOpts0) -> do_query_with_enabled_config( _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error ) -> - ?SLOG(error, Reason), + ?SLOG(warning, Reason), Error; do_query_with_enabled_config( BridgeType, BridgeName, Message, QueryOpts0, Config @@ -863,118 +877,89 @@ import_config(RawConf) -> %% Config Update Handler API %%==================================================================== -config_key_path() -> - [?ROOT_KEY]. +config_key_path() -> [?ROOT_KEY]. -config_key_path_leaf() -> - [?ROOT_KEY, '?', '?']. +config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. -pre_config_update(_, {force_update, Conf}, _OldConf) -> - {ok, Conf}; -%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the -%% underlying resources. -pre_config_update(_, {_Oper, _, _}, undefined) -> - {error, bridge_not_found}; -pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> - %% to save the 'enable' to the config files - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; -pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) -> - {ok, Conf}. +%% enable or disable action +pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> + {error, #{ + bridge_name => Name, + bridge_type => Type, + reason => <<"bridge_not_found">> + }}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) -> + {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}}; -operation_to_enable(disable) -> false; -operation_to_enable(enable) -> true. +%% Updates a single action from a specific HTTP API. +%% If the connector is not found, the update operation fails. +pre_config_update([?ROOT_KEY, ActionType, _Name], Conf = #{}, _OldConf) -> + action_convert_from_connector(ActionType, Conf); +%% Batch updates actions when importing a configuration or executing a CLI command. +%% Update succeeded even if the connector is not found, alarm in post_config_update +pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) -> + {ok, actions_convert_from_connectors(Conf)}. -%% A public API that can trigger this is: -%% bin/emqx ctl conf load data/configs/cluster.hocon -post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) -> - do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false}); -%% This top level handler will be triggered when the actions path is updated -%% with calls to emqx_conf:update([actions], BridgesConf, #{}). -%% -post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> - do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true}); -post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> - Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), - ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), - Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])), +%% Don't crash event the bridge is not found +post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> + AllBridges = emqx:get_config([?ROOT_KEY]), + case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of + undefined -> + ok; + Action -> + ok = uninstall_bridge_v2(Type, Name, Action), + Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges), + reload_message_publish_hook(Bridges) + end, + ?tp(bridge_post_config_update_done, #{}), + ok; +%% Create a single bridge failed if the connector is not found(already check in pre_config_update) +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), reload_message_publish_hook(Bridges), ?tp(bridge_post_config_update_done, #{}), ok; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> - %% N.B.: all bridges must use the same field name (`connector`) to define the - %% connector name. - ConnectorName = maps:get(connector, NewConf), - case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of - ok -> - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; - {error, Error} -> - {error, Error} - end; +%% update bridges failed if the connector is not found(already check in pre_config_update) post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> - ConnectorName = maps:get(connector, NewConf), - case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of - ok -> - ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; - {error, Error} -> - {error, Error} - end. + ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; -do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) -> +%% This top level handler will be triggered when the actions path is updated +%% with calls to emqx_conf:update([actions], BridgesConf, #{}). +%% such as import_config/1 +%% Notice ** do succeeded even if the connector is not found ** +%% Install a non-exist connector will alarm & log(warn) in install_bridge_v2. +post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), - UpdatedConfigs = - lists:map( - fun({{Type, BridgeName}, {_Old, New}}) -> - {Type, BridgeName, New} - end, - maps:to_list(Updated) - ), - AddedConfigs = - lists:map( - fun({{Type, BridgeName}, AddedConf}) -> - {Type, BridgeName, AddedConf} - end, - maps:to_list(Added) - ), - ToValidate = UpdatedConfigs ++ AddedConfigs, - case multi_validate_referenced_connectors(NeedValidate, ToValidate) of - ok -> - %% The config update will be failed if any task in `perform_bridge_changes` failed. - RemoveFun = fun uninstall_bridge_v2/3, - CreateFun = fun install_bridge_v2/3, - UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> - uninstall_bridge_v2(Type, Name, OldBridgeConf), - install_bridge_v2(Type, Name, Conf) - end, - Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, - #{ - action => CreateFun, - data => Added, - on_exception_fn => fun emqx_bridge_resource:remove/4 - }, - #{action => UpdateFun, data => Updated} - ]), - ok = unload_message_publish_hook(), - ok = load_message_publish_hook(NewConf), - ?tp(bridge_post_config_update_done, #{}), - Result; - {error, Error} -> - {error, Error} - end. + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + reload_message_publish_hook(NewConf), + ?tp(bridge_post_config_update_done, #{}), + Result. diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( @@ -1066,12 +1051,10 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> #{connector := ConnectorName} -> ConnectorType = connector_type(BridgeV2Type), ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), - {ok, Channels} = emqx_resource:get_channels(ConnectorResourceId), - case Channels of - [_Channel] -> - true; - _ -> - false + case emqx_resource:get_channels(ConnectorResourceId) of + {ok, [_Channel]} -> true; + %% not_found, [], [_|_] + _ -> false end end. @@ -1148,7 +1131,7 @@ bridge_v1_lookup_and_transform_helper( BridgeV2Status = maps:get(status, Action, undefined), BridgeV2Error = maps:get(error, Action, undefined), ResourceData1 = maps:get(resource_data, BridgeV1, #{}), - %% Replace id in resouce data + %% Replace id in resource data BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1), ConnectorStatus = maps:get(status, ResourceData2, undefined), @@ -1422,7 +1405,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> ). %% Bridge v1 delegated-removal in 3 steps: -%% 1. Delete rule actions if RmoveDeps has 'rule_actions' +%% 1. Delete rule actions if RemoveDeps has 'rule_actions' %% 2. Delete self (the bridge v2), also delete its channel in the connector %% 3. Delete the connector if the connector has no more channel left and if 'connector' is in RemoveDeps bridge_v1_check_deps_and_remove( @@ -1557,6 +1540,9 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> %% Misc helper functions %%==================================================================== +operation_to_enable(disable) -> false; +operation_to_enable(enable) -> true. + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). @@ -1575,50 +1561,84 @@ to_existing_atom(X) -> {error, _} -> throw(bad_atom) end. -validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) -> +referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) -> %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is %% identical to its matching connector type name. - try - {ConnectorName, ConnectorType} = to_connector(ConnectorNameBin, BridgeType), - case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of - undefined -> - throw(not_found); - _ -> - ok - end - catch - throw:not_found -> + case get_connector_info(ConnectorNameBin, BridgeType) of + {error, not_found} -> {error, #{ reason => "connector_not_found_or_wrong_type", connector_name => ConnectorNameBin, bridge_name => BridgeName, bridge_type => BridgeType + }}; + {ok, _Connector} -> + ok + end. + +actions_convert_from_connectors(Conf) -> + maps:map( + fun({ActionType, Actions}) -> + maps:map( + fun(_Name, Action) -> + case action_convert_from_connector(ActionType, Action) of + {ok, NewAction} -> NewAction; + {error, _} -> Action + end + end, + Actions + ) + end, + Conf + ). + +action_convert_from_connector(ActionType, Action = #{<<"connector">> := ConnectorName}) -> + case get_connector_info(ConnectorName, ActionType) of + {ok, Connector} -> + Action1 = emqx_action_info:action_convert_from_connector(ActionType, Connector, Action), + {ok, Action1}; + {error, not_found} -> + {error, #{ + reason => "connector_not_found", + type => ActionType, + connector_name => ConnectorName }} end. +get_connector_info(ConnectorNameBin, BridgeType) -> + case to_connector(ConnectorNameBin, BridgeType) of + {error, not_found} -> + {error, not_found}; + {ConnectorName, ConnectorType} -> + case emqx_config:get_raw([connectors, ConnectorType, ConnectorName], undefined) of + undefined -> {error, not_found}; + Connector -> {ok, Connector} + end + end. + to_connector(ConnectorNameBin, BridgeType) -> try - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)), ConnectorName = to_existing_atom(ConnectorNameBin), + BridgeType1 = to_existing_atom(BridgeType), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType1), {ConnectorName, ConnectorType} catch _:_ -> - throw(not_found) + {error, not_found} end. -multi_validate_referenced_connectors(false, _Configs) -> - ok; -multi_validate_referenced_connectors(true, Configs) -> - Pipeline = - lists:map( - fun({Type, BridgeName, #{connector := ConnectorName}}) -> - fun(_) -> validate_referenced_connectors(Type, ConnectorName, BridgeName) end - end, - Configs - ), - case emqx_utils:pipeline(Pipeline, unused, unused) of - {ok, _, _} -> - ok; - {error, Reason, _State} -> - {error, Reason} - end. +alarm_connector_not_found(ActionType, ActionName, ConnectorName) -> + ConnectorType = connector_type(to_existing_atom(ActionType)), + ResId = emqx_connector_resource:resource_id( + ConnectorType, ConnectorName + ), + _ = emqx_alarm:safe_activate( + ResId, + #{ + connector_name => ConnectorName, + connector_type => ConnectorType, + action_type => ActionType, + action_name => ActionName + }, + <<"connector not found">> + ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 5c3ca5759..1f4390eef 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -56,6 +56,8 @@ project_to_actions_resource_opts/1 ]). +-export([actions_convert_from_connectors/1]). + -export_type([action_type/0]). %% Should we explicitly list them here so dialyzer may be more helpful? @@ -305,6 +307,23 @@ project_to_actions_resource_opts(OldResourceOpts) -> Subfields = common_resource_opts_subfields_bin(), maps:with(Subfields, OldResourceOpts). +actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> + Actions1 = + maps:map(fun(ActionType, ActionMap) -> + maps:map(fun(_ActionName, Action) -> + #{<<"connector">> := ConnName} = Action, + ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), + ConnPath = [<<"connectors">>, ConnType, ConnName], + case emqx_utils_maps:deep_find(ConnPath, RawConf) of + {ok, ConnConf} -> + emqx_action_info:action_convert_from_connector(ActionType, ConnConf, Action); + {not_found, _KeyPath, _Data} -> Action + end + end, ActionMap) + end, Actions), + maps:put(<<"actions">>, Actions1, RawConf); +actions_convert_from_connectors(RawConf) -> RawConf. + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 53130d188..a2e006443 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl index 6a268c931..e8b841048 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl @@ -14,7 +14,8 @@ connector_action_config_to_bridge_v1_config/2, bridge_v1_config_to_action_config/2, bridge_v1_config_to_connector_config/1, - bridge_v1_type_name_fun/1 + bridge_v1_type_name_fun/1, + action_convert_from_connector/2 ]). -import(emqx_utils_conv, [bin/1]). @@ -48,6 +49,23 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> end, maps:without([<<"description">>], Config1). +action_convert_from_connector(ConnectorConfig, ActionConfig) -> + case ConnectorConfig of + #{<<"parameters">> := #{<<"redis_type">> := <<"redis_cluster">>}} -> + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(Opts) -> + Opts#{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">> + } + end, + ActionConfig + ); + _ -> + ActionConfig + end. + bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)), ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)), diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 086332658..83ef0d1cd 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -145,22 +145,10 @@ resource_opts_converter(Conf, _Opts) -> bridge_v2_examples(Method) -> [ #{ - <<"redis_single_producer">> => #{ - summary => <<"Redis Single Producer Action">>, + <<"redis">> => #{ + summary => <<"Redis Action">>, value => action_example(single, Method) } - }, - #{ - <<"redis_sentinel_producer">> => #{ - summary => <<"Redis Sentinel Producer Action">>, - value => action_example(sentinel, Method) - } - }, - #{ - <<"redis_cluster_producer">> => #{ - summary => <<"Redis Cluster Producer Action">>, - value => action_example(cluster, Method) - } } ]. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 8cfa9a8ea..4a9f6f24a 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -76,8 +76,10 @@ -define(DEFAULT_MAX_PORTS, 1024 * 1024). %% Callback to upgrade config after loaded from config file but before validation. -upgrade_raw_conf(RawConf) -> - emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf). +upgrade_raw_conf(Raw0) -> + Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), + emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1). + namespace() -> emqx. diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index dac85273a..70e4513a7 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -50,6 +50,7 @@ ]). -define(ROOT_KEY, connectors). +-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)). load() -> Connectors = emqx:get_config([?ROOT_KEY], #{}), @@ -107,22 +108,24 @@ config_key_path() -> pre_config_update([?ROOT_KEY], RawConf, RawConf) -> {ok, RawConf}; -pre_config_update([?ROOT_KEY], {force_update, NewConf}, RawConf) -> - pre_config_update([?ROOT_KEY], NewConf, RawConf); pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> case multi_validate_connector_names(NewConf) of - ok -> - {ok, convert_certs(NewConf)}; - Error -> - Error + ok -> {ok, convert_certs(NewConf)}; + Error -> Error end; -pre_config_update(_, {_Oper, _, _}, undefined) -> - {error, connector_not_found}; -pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> +pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) + when ?ENABLE_OR_DISABLE(Oper) -> + {error, #{ + reason => <<"connector_not_found">>, + connector_name => Name, + connector_type => Type + }}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) + when ?ENABLE_OR_DISABLE(Oper) -> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; -pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> - case validate_connector_name_in_config(Path) of +pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) -> + case validate_connector_name(Name) of ok -> case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of {error, Reason} -> @@ -137,18 +140,11 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> operation_to_enable(disable) -> false; operation_to_enable(enable) -> true. -post_config_update([?ROOT_KEY], {force_update, _}, NewConf, OldConf, _AppEnv) -> - #{added := Added, removed := Removed, changed := Updated} = - diff_confs(NewConf, OldConf), - perform_connector_changes(Removed, Added, Updated); post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> - #{added := Added, removed := Removed, changed := Updated} = - diff_confs(NewConf, OldConf), + #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), case ensure_no_channels(Removed) of - ok -> - perform_connector_changes(Removed, Added, Updated); - {error, Error} -> - {error, Error} + ok -> perform_connector_changes(Removed, Added, Updated); + {error, Error} -> {error, Error} end; post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> case emqx_connector_resource:get_channels(Type, Name) of @@ -159,11 +155,13 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> {ok, Channels} -> {error, {active_channels, Channels}} end; +%% create a new connector post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> ResOpts = emqx_resource:fetch_creation_opts(NewConf), ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts), ?tp(connector_post_config_update_done, #{}), ok; +%% update an existing connector post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ResOpts = emqx_resource:fetch_creation_opts(NewConf), ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts), @@ -226,12 +224,10 @@ lookup(Type, Name, RawConf) -> get_metrics(Type, Name) -> emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)). -disable_enable(Action, ConnectorType, ConnectorName) when - Action =:= disable; Action =:= enable --> +disable_enable(Action, ConnectorType, ConnectorName) when ?ENABLE_OR_DISABLE(Action) -> emqx_conf:update( config_key_path() ++ [ConnectorType, ConnectorName], - {Action, ConnectorType, ConnectorName}, + Action, #{override_to => cluster} ). @@ -250,7 +246,7 @@ create(ConnectorType, ConnectorName, RawConf) -> remove(ConnectorType, ConnectorName) -> ?SLOG(debug, #{ - brige_action => remove, + bridge_action => remove, connector_type => ConnectorType, connector_name => ConnectorName }), @@ -293,6 +289,7 @@ import_config(RawConf) -> ConnectorsConf = maps:get(<<"connectors">>, RawConf, #{}), OldConnectorsConf = emqx:get_raw_config(RootKeyPath, #{}), MergedConf = merge_confs(OldConnectorsConf, ConnectorsConf), + %% using merge strategy, deletions should not be performed within the post_config_update/5. case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of {ok, #{raw_config := NewRawConf}} -> {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldConnectorsConf, NewRawConf)}}; @@ -490,14 +487,6 @@ validate_connector_name(ConnectorName) -> {error, Error} end. -validate_connector_name_in_config(Path) -> - case Path of - [?ROOT_KEY, _ConnectorType, ConnectorName] -> - validate_connector_name(ConnectorName); - _ -> - ok - end. - multi_validate_connector_names(Conf) -> ConnectorTypeAndNames = [ diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 08c320490..a58a1ef3d 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -136,14 +136,16 @@ create(Type, Name, Conf0, Opts) -> config => emqx_utils:redact(Conf0) }), TypeBin = bin(Type), + ResourceId = resource_id(Type, Name), Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( - resource_id(Type, Name), + ResourceId, <<"emqx_connector">>, ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) ), + _ = emqx_alarm:ensure_deactivated(ResourceId), ok. update(ConnectorId, {OldConf, Conf}) -> From ec34b6f41d503bd1d2ca8b37ba0ccd5713d73258 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 8 Jan 2024 09:45:48 +0800 Subject: [PATCH 2/2] refactor: remove redis_type from redis action --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 48 +++++--------- .../src/schema/emqx_bridge_v2_schema.erl | 36 ++++++---- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 16 ++--- .../src/emqx_bridge_redis.erl | 8 ++- .../src/emqx_bridge_redis_schema.erl | 66 ++++++++----------- apps/emqx_conf/src/emqx_conf_cli.erl | 46 +++++++++---- apps/emqx_conf/src/emqx_conf_schema.erl | 1 - apps/emqx_connector/src/emqx_connector.erl | 16 ++--- 8 files changed, 121 insertions(+), 116 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 455bee5eb..723808919 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -151,23 +151,12 @@ %%==================================================================== load() -> - reinit_bridges(), load_bridges(), load_message_publish_hook(), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok. -reinit_bridges() -> - %% todo - Bridges = emqx:get_raw_config([?ROOT_KEY], #{}), - try - actions_convert_from_connectors(Bridges) - catch - _:_ -> ok - end, - ok. - load_bridges() -> Bridges = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( @@ -456,8 +445,8 @@ combine_connector_and_bridge_v2_config( _:_ -> alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName), {error, #{ - reason => "connector_not_found", - type => BridgeV2Type, + reason => <<"connector_not_found_or_wrong_type">>, + bridge_type => BridgeV2Type, bridge_name => BridgeName, connector_name => ConnectorName }} @@ -882,19 +871,14 @@ config_key_path() -> [?ROOT_KEY]. config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. %% enable or disable action -pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> - {error, #{ - bridge_name => Name, - bridge_type => Type, - reason => <<"bridge_not_found">> - }}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> + {error, bridge_not_found}; pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) -> {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}}; - %% Updates a single action from a specific HTTP API. %% If the connector is not found, the update operation fails. -pre_config_update([?ROOT_KEY, ActionType, _Name], Conf = #{}, _OldConf) -> - action_convert_from_connector(ActionType, Conf); +pre_config_update([?ROOT_KEY, Type, Name], Conf = #{}, _OldConf) -> + action_convert_from_connector(Type, Name, Conf); %% Batch updates actions when importing a configuration or executing a CLI command. %% Update succeeded even if the connector is not found, alarm in post_config_update pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) -> @@ -932,7 +916,6 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, reload_message_publish_hook(Bridges), ?tp(bridge_post_config_update_done, #{}), ok; - %% This top level handler will be triggered when the actions path is updated %% with calls to emqx_conf:update([actions], BridgesConf, #{}). %% such as import_config/1 @@ -947,7 +930,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> uninstall_bridge_v2(Type, Name, OldBridgeConf), install_bridge_v2(Type, Name, Conf) - end, + end, Result = perform_bridge_changes([ #{action => RemoveFun, data => Removed}, #{ @@ -1578,10 +1561,10 @@ referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) -> actions_convert_from_connectors(Conf) -> maps:map( - fun({ActionType, Actions}) -> + fun(ActionType, Actions) -> maps:map( - fun(_Name, Action) -> - case action_convert_from_connector(ActionType, Action) of + fun(ActionName, Action) -> + case action_convert_from_connector(ActionType, ActionName, Action) of {ok, NewAction} -> NewAction; {error, _} -> Action end @@ -1592,15 +1575,16 @@ actions_convert_from_connectors(Conf) -> Conf ). -action_convert_from_connector(ActionType, Action = #{<<"connector">> := ConnectorName}) -> - case get_connector_info(ConnectorName, ActionType) of +action_convert_from_connector(Type, Name, Action = #{<<"connector">> := ConnectorName}) -> + case get_connector_info(ConnectorName, Type) of {ok, Connector} -> - Action1 = emqx_action_info:action_convert_from_connector(ActionType, Connector, Action), + Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action), {ok, Action1}; {error, not_found} -> {error, #{ - reason => "connector_not_found", - type => ActionType, + bridge_name => Name, + reason => <<"connector_not_found_or_wrong_type">>, + bridge_type => Type, connector_name => ConnectorName }} end. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 1f4390eef..74239ffc0 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -309,20 +309,30 @@ project_to_actions_resource_opts(OldResourceOpts) -> actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> Actions1 = - maps:map(fun(ActionType, ActionMap) -> - maps:map(fun(_ActionName, Action) -> - #{<<"connector">> := ConnName} = Action, - ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), - ConnPath = [<<"connectors">>, ConnType, ConnName], - case emqx_utils_maps:deep_find(ConnPath, RawConf) of - {ok, ConnConf} -> - emqx_action_info:action_convert_from_connector(ActionType, ConnConf, Action); - {not_found, _KeyPath, _Data} -> Action - end - end, ActionMap) - end, Actions), + maps:map( + fun(ActionType, ActionMap) -> + maps:map( + fun(_ActionName, Action) -> + #{<<"connector">> := ConnName} = Action, + ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), + ConnPath = [<<"connectors">>, ConnType, ConnName], + case emqx_utils_maps:deep_find(ConnPath, RawConf) of + {ok, ConnConf} -> + emqx_action_info:action_convert_from_connector( + ActionType, ConnConf, Action + ); + {not_found, _KeyPath, _Data} -> + Action + end + end, + ActionMap + ) + end, + Actions + ), maps:put(<<"actions">>, Actions1, RawConf); -actions_convert_from_connectors(RawConf) -> RawConf. +actions_convert_from_connectors(RawConf) -> + RawConf. -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index cc9f505c2..f3b8a29d7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -610,7 +610,7 @@ t_load_no_matching_connector(_Config) -> bridge_name := my_test_bridge_update, connector_name := <<"unknown">>, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, update_root_config(RootConf0) ), @@ -627,7 +627,7 @@ t_load_no_matching_connector(_Config) -> bridge_name := my_test_bridge_new, connector_name := <<"unknown">>, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, update_root_config(RootConf1) ), @@ -696,11 +696,11 @@ t_create_no_matching_connector(_Config) -> Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>}, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) ), @@ -716,11 +716,11 @@ t_create_wrong_connector_type(_Config) -> Conf = bridge_config(), ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := wrong_type, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) ), @@ -732,11 +732,11 @@ t_update_connector_not_found(_Config) -> BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>}, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) ), diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 75419570f..c9882e616 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -105,8 +105,12 @@ fields(action_parameters) -> command_template(), {redis_type, ?HOCON( - ?ENUM([single, sentinel, cluster]), - #{required => true, desc => ?DESC(redis_type)} + ?ENUM([single, sentinel, cluster]), #{ + required => false, + desc => ?DESC(redis_type), + hidden => true, + importance => ?IMPORTANCE_HIDDEN + } )} ]; fields("post_single") -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 83ef0d1cd..9373fe8bd 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -13,8 +13,7 @@ namespace/0, roots/0, fields/1, - desc/1, - resource_opts_converter/2 + desc/1 ]). %% `emqx_bridge_v2_schema' "unofficial" API @@ -60,7 +59,6 @@ fields(action) -> ?MAP(name, ?R_REF(redis_action)), #{ desc => <<"Redis Action Config">>, - converter => fun ?MODULE:resource_opts_converter/2, required => false } )}; @@ -118,26 +116,6 @@ desc(action_resource_opts) -> desc(_Name) -> undefined. -resource_opts_converter(undefined, _Opts) -> - undefined; -resource_opts_converter(Conf, _Opts) -> - maps:map( - fun(_Name, SubConf) -> - case SubConf of - #{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} -> - ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}), - %% cluster don't support batch - SubConf#{ - <<"resource_opts">> => - ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>} - }; - _ -> - SubConf - end - end, - Conf - ). - %%------------------------------------------------------------------------------------------------- %% `emqx_bridge_v2_schema' "unofficial" API %%------------------------------------------------------------------------------------------------- @@ -147,7 +125,7 @@ bridge_v2_examples(Method) -> #{ <<"redis">> => #{ summary => <<"Redis Action">>, - value => action_example(single, Method) + value => action_example(Method) } } ]. @@ -177,17 +155,17 @@ connector_examples(Method) -> conn_bridge_examples(Method) -> emqx_bridge_redis:conn_bridge_examples(Method). -action_example(RedisType, post) -> +action_example(post) -> maps:merge( - action_example(RedisType, put), + action_example(put), #{ type => <<"redis">>, name => <<"my_action">> } ); -action_example(RedisType, get) -> +action_example(get) -> maps:merge( - action_example(RedisType, put), + action_example(put), #{ status => <<"connected">>, node_status => [ @@ -198,14 +176,13 @@ action_example(RedisType, get) -> ] } ); -action_example(RedisType, put) -> +action_example(put) -> #{ enable => true, connector => <<"my_connector_name">>, description => <<"My action">>, parameters => #{ - command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], - redis_type => RedisType + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>] }, resource_opts => #{batch_size => 1} }. @@ -236,20 +213,33 @@ connector_example(RedisType, put) -> enable => true, description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, parameters => connector_parameter(RedisType), - pool_size => 8, - database => 1, - username => <<"test">>, - password => <<"******">>, ssl => #{enable => false} }. connector_parameter(single) -> - #{redis_type => single, server => <<"127.0.0.1:6379">>}; + #{ + redis_type => single, + server => <<"127.0.0.1:6379">>, + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">> + }; connector_parameter(cluster) -> - #{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; + #{ + redis_type => cluster, + servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, + pool_size => 8, + username => <<"test">>, + password => <<"******">> + }; connector_parameter(sentinel) -> #{ redis_type => sentinel, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, - sentinel => <<"myredismaster">> + sentinel => <<"myredismaster">>, + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">> }. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b6ba0dfc9..d519e2e05 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -239,6 +239,11 @@ load_config_from_raw(RawConf0, Opts) -> RawConf = emqx_config:fill_defaults(RawConf1), case check_config(RawConf) of ok -> + %% It has been ensured that the connector is always the first configuration to be updated. + %% However, when deleting the connector, we need to clean up the dependent actions first; + %% otherwise, the deletion will fail. + %% notice: we can't create a action before connector. + uninstall_actions(RawConf, Opts), Error = lists:filtermap( fun({K, V}) -> @@ -272,6 +277,29 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. +uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) -> + Old = emqx_conf:get_raw([<<"actions">>], #{}), + #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), + maps:foreach( + fun({Type, Name}, _) -> + case emqx_bridge_v2:remove(Type, Name) of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_remove_action", + type => Type, + name => Name, + error => Reason + }) + end + end, + Removed + ); +%% we don't delete things when in merge mode or without actions key. +uninstall_actions(_RawConf, _) -> + ok. + update_config_cluster( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, @@ -286,16 +314,9 @@ update_config_cluster( check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - Request = make_request(Key, Merged), - check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts); + check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts); update_config_cluster(Key, Value, #{mode := replace} = Opts) -> - Request = make_request(Key, Value), - check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts). - -make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> -> - {force_update, Value}; -make_request(_Key, Value) -> - Value. + check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( @@ -312,11 +333,9 @@ update_config_local( check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); update_config_local(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - Request = make_request(Key, Merged), - check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts); + check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts); update_config_local(Key, Value, #{mode := replace} = Opts) -> - Request = make_request(Key, Value), - check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts). + check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Node, Key, {ok, _}, _Conf, Opts) -> @@ -442,6 +461,7 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> + uninstall_actions(AllConf, Opts), Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, Opts) of ok -> diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4a9f6f24a..6614b24e2 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -80,7 +80,6 @@ upgrade_raw_conf(Raw0) -> Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1). - namespace() -> emqx. tags() -> diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 70e4513a7..27dfcba2a 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -113,15 +113,13 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> ok -> {ok, convert_certs(NewConf)}; Error -> Error end; -pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) - when ?ENABLE_OR_DISABLE(Oper) -> - {error, #{ - reason => <<"connector_not_found">>, - connector_name => Name, - connector_type => Type - }}; -pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) - when ?ENABLE_OR_DISABLE(Oper) -> +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when + ?ENABLE_OR_DISABLE(Oper) +-> + {error, connector_not_found}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) when + ?ENABLE_OR_DISABLE(Oper) +-> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) ->