From ec34b6f41d503bd1d2ca8b37ba0ccd5713d73258 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 8 Jan 2024 09:45:48 +0800 Subject: [PATCH] refactor: remove redis_type from redis action --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 48 +++++--------- .../src/schema/emqx_bridge_v2_schema.erl | 36 ++++++---- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 16 ++--- .../src/emqx_bridge_redis.erl | 8 ++- .../src/emqx_bridge_redis_schema.erl | 66 ++++++++----------- apps/emqx_conf/src/emqx_conf_cli.erl | 46 +++++++++---- apps/emqx_conf/src/emqx_conf_schema.erl | 1 - apps/emqx_connector/src/emqx_connector.erl | 16 ++--- 8 files changed, 121 insertions(+), 116 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 455bee5eb..723808919 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -151,23 +151,12 @@ %%==================================================================== load() -> - reinit_bridges(), load_bridges(), load_message_publish_hook(), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok. -reinit_bridges() -> - %% todo - Bridges = emqx:get_raw_config([?ROOT_KEY], #{}), - try - actions_convert_from_connectors(Bridges) - catch - _:_ -> ok - end, - ok. - load_bridges() -> Bridges = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( @@ -456,8 +445,8 @@ combine_connector_and_bridge_v2_config( _:_ -> alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName), {error, #{ - reason => "connector_not_found", - type => BridgeV2Type, + reason => <<"connector_not_found_or_wrong_type">>, + bridge_type => BridgeV2Type, bridge_name => BridgeName, connector_name => ConnectorName }} @@ -882,19 +871,14 @@ config_key_path() -> [?ROOT_KEY]. config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. %% enable or disable action -pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> - {error, #{ - bridge_name => Name, - bridge_type => Type, - reason => <<"bridge_not_found">> - }}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> + {error, bridge_not_found}; pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) -> {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}}; - %% Updates a single action from a specific HTTP API. %% If the connector is not found, the update operation fails. -pre_config_update([?ROOT_KEY, ActionType, _Name], Conf = #{}, _OldConf) -> - action_convert_from_connector(ActionType, Conf); +pre_config_update([?ROOT_KEY, Type, Name], Conf = #{}, _OldConf) -> + action_convert_from_connector(Type, Name, Conf); %% Batch updates actions when importing a configuration or executing a CLI command. %% Update succeeded even if the connector is not found, alarm in post_config_update pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) -> @@ -932,7 +916,6 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, reload_message_publish_hook(Bridges), ?tp(bridge_post_config_update_done, #{}), ok; - %% This top level handler will be triggered when the actions path is updated %% with calls to emqx_conf:update([actions], BridgesConf, #{}). %% such as import_config/1 @@ -947,7 +930,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> uninstall_bridge_v2(Type, Name, OldBridgeConf), install_bridge_v2(Type, Name, Conf) - end, + end, Result = perform_bridge_changes([ #{action => RemoveFun, data => Removed}, #{ @@ -1578,10 +1561,10 @@ referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) -> actions_convert_from_connectors(Conf) -> maps:map( - fun({ActionType, Actions}) -> + fun(ActionType, Actions) -> maps:map( - fun(_Name, Action) -> - case action_convert_from_connector(ActionType, Action) of + fun(ActionName, Action) -> + case action_convert_from_connector(ActionType, ActionName, Action) of {ok, NewAction} -> NewAction; {error, _} -> Action end @@ -1592,15 +1575,16 @@ actions_convert_from_connectors(Conf) -> Conf ). -action_convert_from_connector(ActionType, Action = #{<<"connector">> := ConnectorName}) -> - case get_connector_info(ConnectorName, ActionType) of +action_convert_from_connector(Type, Name, Action = #{<<"connector">> := ConnectorName}) -> + case get_connector_info(ConnectorName, Type) of {ok, Connector} -> - Action1 = emqx_action_info:action_convert_from_connector(ActionType, Connector, Action), + Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action), {ok, Action1}; {error, not_found} -> {error, #{ - reason => "connector_not_found", - type => ActionType, + bridge_name => Name, + reason => <<"connector_not_found_or_wrong_type">>, + bridge_type => Type, connector_name => ConnectorName }} end. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 1f4390eef..74239ffc0 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -309,20 +309,30 @@ project_to_actions_resource_opts(OldResourceOpts) -> actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> Actions1 = - maps:map(fun(ActionType, ActionMap) -> - maps:map(fun(_ActionName, Action) -> - #{<<"connector">> := ConnName} = Action, - ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), - ConnPath = [<<"connectors">>, ConnType, ConnName], - case emqx_utils_maps:deep_find(ConnPath, RawConf) of - {ok, ConnConf} -> - emqx_action_info:action_convert_from_connector(ActionType, ConnConf, Action); - {not_found, _KeyPath, _Data} -> Action - end - end, ActionMap) - end, Actions), + maps:map( + fun(ActionType, ActionMap) -> + maps:map( + fun(_ActionName, Action) -> + #{<<"connector">> := ConnName} = Action, + ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), + ConnPath = [<<"connectors">>, ConnType, ConnName], + case emqx_utils_maps:deep_find(ConnPath, RawConf) of + {ok, ConnConf} -> + emqx_action_info:action_convert_from_connector( + ActionType, ConnConf, Action + ); + {not_found, _KeyPath, _Data} -> + Action + end + end, + ActionMap + ) + end, + Actions + ), maps:put(<<"actions">>, Actions1, RawConf); -actions_convert_from_connectors(RawConf) -> RawConf. +actions_convert_from_connectors(RawConf) -> + RawConf. -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index cc9f505c2..f3b8a29d7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -610,7 +610,7 @@ t_load_no_matching_connector(_Config) -> bridge_name := my_test_bridge_update, connector_name := <<"unknown">>, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, update_root_config(RootConf0) ), @@ -627,7 +627,7 @@ t_load_no_matching_connector(_Config) -> bridge_name := my_test_bridge_new, connector_name := <<"unknown">>, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, update_root_config(RootConf1) ), @@ -696,11 +696,11 @@ t_create_no_matching_connector(_Config) -> Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>}, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) ), @@ -716,11 +716,11 @@ t_create_wrong_connector_type(_Config) -> Conf = bridge_config(), ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := wrong_type, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) ), @@ -732,11 +732,11 @@ t_update_connector_not_found(_Config) -> BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>}, ?assertMatch( {error, - {post_config_update, _HandlerMod, #{ + {pre_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, bridge_type := _, - reason := "connector_not_found_or_wrong_type" + reason := <<"connector_not_found_or_wrong_type">> }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) ), diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl index 75419570f..c9882e616 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.erl @@ -105,8 +105,12 @@ fields(action_parameters) -> command_template(), {redis_type, ?HOCON( - ?ENUM([single, sentinel, cluster]), - #{required => true, desc => ?DESC(redis_type)} + ?ENUM([single, sentinel, cluster]), #{ + required => false, + desc => ?DESC(redis_type), + hidden => true, + importance => ?IMPORTANCE_HIDDEN + } )} ]; fields("post_single") -> diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl index 83ef0d1cd..9373fe8bd 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl @@ -13,8 +13,7 @@ namespace/0, roots/0, fields/1, - desc/1, - resource_opts_converter/2 + desc/1 ]). %% `emqx_bridge_v2_schema' "unofficial" API @@ -60,7 +59,6 @@ fields(action) -> ?MAP(name, ?R_REF(redis_action)), #{ desc => <<"Redis Action Config">>, - converter => fun ?MODULE:resource_opts_converter/2, required => false } )}; @@ -118,26 +116,6 @@ desc(action_resource_opts) -> desc(_Name) -> undefined. -resource_opts_converter(undefined, _Opts) -> - undefined; -resource_opts_converter(Conf, _Opts) -> - maps:map( - fun(_Name, SubConf) -> - case SubConf of - #{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} -> - ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}), - %% cluster don't support batch - SubConf#{ - <<"resource_opts">> => - ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>} - }; - _ -> - SubConf - end - end, - Conf - ). - %%------------------------------------------------------------------------------------------------- %% `emqx_bridge_v2_schema' "unofficial" API %%------------------------------------------------------------------------------------------------- @@ -147,7 +125,7 @@ bridge_v2_examples(Method) -> #{ <<"redis">> => #{ summary => <<"Redis Action">>, - value => action_example(single, Method) + value => action_example(Method) } } ]. @@ -177,17 +155,17 @@ connector_examples(Method) -> conn_bridge_examples(Method) -> emqx_bridge_redis:conn_bridge_examples(Method). -action_example(RedisType, post) -> +action_example(post) -> maps:merge( - action_example(RedisType, put), + action_example(put), #{ type => <<"redis">>, name => <<"my_action">> } ); -action_example(RedisType, get) -> +action_example(get) -> maps:merge( - action_example(RedisType, put), + action_example(put), #{ status => <<"connected">>, node_status => [ @@ -198,14 +176,13 @@ action_example(RedisType, get) -> ] } ); -action_example(RedisType, put) -> +action_example(put) -> #{ enable => true, connector => <<"my_connector_name">>, description => <<"My action">>, parameters => #{ - command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], - redis_type => RedisType + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>] }, resource_opts => #{batch_size => 1} }. @@ -236,20 +213,33 @@ connector_example(RedisType, put) -> enable => true, description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, parameters => connector_parameter(RedisType), - pool_size => 8, - database => 1, - username => <<"test">>, - password => <<"******">>, ssl => #{enable => false} }. connector_parameter(single) -> - #{redis_type => single, server => <<"127.0.0.1:6379">>}; + #{ + redis_type => single, + server => <<"127.0.0.1:6379">>, + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">> + }; connector_parameter(cluster) -> - #{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; + #{ + redis_type => cluster, + servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, + pool_size => 8, + username => <<"test">>, + password => <<"******">> + }; connector_parameter(sentinel) -> #{ redis_type => sentinel, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, - sentinel => <<"myredismaster">> + sentinel => <<"myredismaster">>, + pool_size => 8, + database => 1, + username => <<"test">>, + password => <<"******">> }. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b6ba0dfc9..d519e2e05 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -239,6 +239,11 @@ load_config_from_raw(RawConf0, Opts) -> RawConf = emqx_config:fill_defaults(RawConf1), case check_config(RawConf) of ok -> + %% It has been ensured that the connector is always the first configuration to be updated. + %% However, when deleting the connector, we need to clean up the dependent actions first; + %% otherwise, the deletion will fail. + %% notice: we can't create a action before connector. + uninstall_actions(RawConf, Opts), Error = lists:filtermap( fun({K, V}) -> @@ -272,6 +277,29 @@ load_config_from_raw(RawConf0, Opts) -> {error, Errors} end. +uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) -> + Old = emqx_conf:get_raw([<<"actions">>], #{}), + #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old), + maps:foreach( + fun({Type, Name}, _) -> + case emqx_bridge_v2:remove(Type, Name) of + ok -> + ok; + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_remove_action", + type => Type, + name => Name, + error => Reason + }) + end + end, + Removed + ); +%% we don't delete things when in merge mode or without actions key. +uninstall_actions(_RawConf, _) -> + ok. + update_config_cluster( ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, @@ -286,16 +314,9 @@ update_config_cluster( check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - Request = make_request(Key, Merged), - check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts); + check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts); update_config_cluster(Key, Value, #{mode := replace} = Opts) -> - Request = make_request(Key, Value), - check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts). - -make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> -> - {force_update, Value}; -make_request(_Key, Value) -> - Value. + check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). update_config_local( @@ -312,11 +333,9 @@ update_config_local( check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); update_config_local(Key, NewConf, #{mode := merge} = Opts) -> Merged = merge_conf(Key, NewConf), - Request = make_request(Key, Merged), - check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts); + check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts); update_config_local(Key, Value, #{mode := replace} = Opts) -> - Request = make_request(Key, Value), - check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts). + check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Node, Key, {ok, _}, _Conf, Opts) -> @@ -442,6 +461,7 @@ filter_readonly_config(Raw) -> end. reload_config(AllConf, Opts) -> + uninstall_actions(AllConf, Opts), Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, Opts) of ok -> diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4a9f6f24a..6614b24e2 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -80,7 +80,6 @@ upgrade_raw_conf(Raw0) -> Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1). - namespace() -> emqx. tags() -> diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 70e4513a7..27dfcba2a 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -113,15 +113,13 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> ok -> {ok, convert_certs(NewConf)}; Error -> Error end; -pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) - when ?ENABLE_OR_DISABLE(Oper) -> - {error, #{ - reason => <<"connector_not_found">>, - connector_name => Name, - connector_type => Type - }}; -pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) - when ?ENABLE_OR_DISABLE(Oper) -> +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when + ?ENABLE_OR_DISABLE(Oper) +-> + {error, connector_not_found}; +pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) when + ?ENABLE_OR_DISABLE(Oper) +-> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) ->