refactor: remove redis_type from redis action

This commit is contained in:
zhongwencool 2024-01-08 09:45:48 +08:00
parent 3404f39fd2
commit ec34b6f41d
8 changed files with 121 additions and 116 deletions

View File

@ -151,23 +151,12 @@
%%==================================================================== %%====================================================================
load() -> load() ->
reinit_bridges(),
load_bridges(), load_bridges(),
load_message_publish_hook(), load_message_publish_hook(),
ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2),
ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2),
ok. ok.
reinit_bridges() ->
%% todo
Bridges = emqx:get_raw_config([?ROOT_KEY], #{}),
try
actions_convert_from_connectors(Bridges)
catch
_:_ -> ok
end,
ok.
load_bridges() -> load_bridges() ->
Bridges = emqx:get_config([?ROOT_KEY], #{}), Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach( lists:foreach(
@ -456,8 +445,8 @@ combine_connector_and_bridge_v2_config(
_:_ -> _:_ ->
alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName), alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName),
{error, #{ {error, #{
reason => "connector_not_found", reason => <<"connector_not_found_or_wrong_type">>,
type => BridgeV2Type, bridge_type => BridgeV2Type,
bridge_name => BridgeName, bridge_name => BridgeName,
connector_name => ConnectorName connector_name => ConnectorName
}} }}
@ -882,19 +871,14 @@ config_key_path() -> [?ROOT_KEY].
config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. config_key_path_leaf() -> [?ROOT_KEY, '?', '?'].
%% enable or disable action %% enable or disable action
pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) ->
{error, #{ {error, bridge_not_found};
bridge_name => Name,
bridge_type => Type,
reason => <<"bridge_not_found">>
}};
pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) -> pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) ->
{ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}}; {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}};
%% Updates a single action from a specific HTTP API. %% Updates a single action from a specific HTTP API.
%% If the connector is not found, the update operation fails. %% If the connector is not found, the update operation fails.
pre_config_update([?ROOT_KEY, ActionType, _Name], Conf = #{}, _OldConf) -> pre_config_update([?ROOT_KEY, Type, Name], Conf = #{}, _OldConf) ->
action_convert_from_connector(ActionType, Conf); action_convert_from_connector(Type, Name, Conf);
%% Batch updates actions when importing a configuration or executing a CLI command. %% Batch updates actions when importing a configuration or executing a CLI command.
%% Update succeeded even if the connector is not found, alarm in post_config_update %% Update succeeded even if the connector is not found, alarm in post_config_update
pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) -> pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) ->
@ -932,7 +916,6 @@ post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf,
reload_message_publish_hook(Bridges), reload_message_publish_hook(Bridges),
?tp(bridge_post_config_update_done, #{}), ?tp(bridge_post_config_update_done, #{}),
ok; ok;
%% This top level handler will be triggered when the actions path is updated %% This top level handler will be triggered when the actions path is updated
%% with calls to emqx_conf:update([actions], BridgesConf, #{}). %% with calls to emqx_conf:update([actions], BridgesConf, #{}).
%% such as import_config/1 %% such as import_config/1
@ -1578,10 +1561,10 @@ referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) ->
actions_convert_from_connectors(Conf) -> actions_convert_from_connectors(Conf) ->
maps:map( maps:map(
fun({ActionType, Actions}) -> fun(ActionType, Actions) ->
maps:map( maps:map(
fun(_Name, Action) -> fun(ActionName, Action) ->
case action_convert_from_connector(ActionType, Action) of case action_convert_from_connector(ActionType, ActionName, Action) of
{ok, NewAction} -> NewAction; {ok, NewAction} -> NewAction;
{error, _} -> Action {error, _} -> Action
end end
@ -1592,15 +1575,16 @@ actions_convert_from_connectors(Conf) ->
Conf Conf
). ).
action_convert_from_connector(ActionType, Action = #{<<"connector">> := ConnectorName}) -> action_convert_from_connector(Type, Name, Action = #{<<"connector">> := ConnectorName}) ->
case get_connector_info(ConnectorName, ActionType) of case get_connector_info(ConnectorName, Type) of
{ok, Connector} -> {ok, Connector} ->
Action1 = emqx_action_info:action_convert_from_connector(ActionType, Connector, Action), Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action),
{ok, Action1}; {ok, Action1};
{error, not_found} -> {error, not_found} ->
{error, #{ {error, #{
reason => "connector_not_found", bridge_name => Name,
type => ActionType, reason => <<"connector_not_found_or_wrong_type">>,
bridge_type => Type,
connector_name => ConnectorName connector_name => ConnectorName
}} }}
end. end.

View File

@ -309,20 +309,30 @@ project_to_actions_resource_opts(OldResourceOpts) ->
actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) -> actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) ->
Actions1 = Actions1 =
maps:map(fun(ActionType, ActionMap) -> maps:map(
maps:map(fun(_ActionName, Action) -> fun(ActionType, ActionMap) ->
maps:map(
fun(_ActionName, Action) ->
#{<<"connector">> := ConnName} = Action, #{<<"connector">> := ConnName} = Action,
ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)), ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)),
ConnPath = [<<"connectors">>, ConnType, ConnName], ConnPath = [<<"connectors">>, ConnType, ConnName],
case emqx_utils_maps:deep_find(ConnPath, RawConf) of case emqx_utils_maps:deep_find(ConnPath, RawConf) of
{ok, ConnConf} -> {ok, ConnConf} ->
emqx_action_info:action_convert_from_connector(ActionType, ConnConf, Action); emqx_action_info:action_convert_from_connector(
{not_found, _KeyPath, _Data} -> Action ActionType, ConnConf, Action
);
{not_found, _KeyPath, _Data} ->
Action
end end
end, ActionMap) end,
end, Actions), ActionMap
)
end,
Actions
),
maps:put(<<"actions">>, Actions1, RawConf); maps:put(<<"actions">>, Actions1, RawConf);
actions_convert_from_connectors(RawConf) -> RawConf. actions_convert_from_connectors(RawConf) ->
RawConf.
-ifdef(TEST). -ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl"). -include_lib("hocon/include/hocon_types.hrl").

View File

@ -610,7 +610,7 @@ t_load_no_matching_connector(_Config) ->
bridge_name := my_test_bridge_update, bridge_name := my_test_bridge_update,
connector_name := <<"unknown">>, connector_name := <<"unknown">>,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
update_root_config(RootConf0) update_root_config(RootConf0)
), ),
@ -627,7 +627,7 @@ t_load_no_matching_connector(_Config) ->
bridge_name := my_test_bridge_new, bridge_name := my_test_bridge_new,
connector_name := <<"unknown">>, connector_name := <<"unknown">>,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
update_root_config(RootConf1) update_root_config(RootConf1)
), ),
@ -696,11 +696,11 @@ t_create_no_matching_connector(_Config) ->
Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>}, Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
), ),
@ -716,11 +716,11 @@ t_create_wrong_connector_type(_Config) ->
Conf = bridge_config(), Conf = bridge_config(),
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := wrong_type, bridge_type := wrong_type,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
), ),
@ -732,11 +732,11 @@ t_update_connector_not_found(_Config) ->
BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>}, BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>},
?assertMatch( ?assertMatch(
{error, {error,
{post_config_update, _HandlerMod, #{ {pre_config_update, _HandlerMod, #{
bridge_name := _, bridge_name := _,
connector_name := _, connector_name := _,
bridge_type := _, bridge_type := _,
reason := "connector_not_found_or_wrong_type" reason := <<"connector_not_found_or_wrong_type">>
}}}, }}},
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
), ),

View File

@ -105,8 +105,12 @@ fields(action_parameters) ->
command_template(), command_template(),
{redis_type, {redis_type,
?HOCON( ?HOCON(
?ENUM([single, sentinel, cluster]), ?ENUM([single, sentinel, cluster]), #{
#{required => true, desc => ?DESC(redis_type)} required => false,
desc => ?DESC(redis_type),
hidden => true,
importance => ?IMPORTANCE_HIDDEN
}
)} )}
]; ];
fields("post_single") -> fields("post_single") ->

View File

@ -13,8 +13,7 @@
namespace/0, namespace/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1, desc/1
resource_opts_converter/2
]). ]).
%% `emqx_bridge_v2_schema' "unofficial" API %% `emqx_bridge_v2_schema' "unofficial" API
@ -60,7 +59,6 @@ fields(action) ->
?MAP(name, ?R_REF(redis_action)), ?MAP(name, ?R_REF(redis_action)),
#{ #{
desc => <<"Redis Action Config">>, desc => <<"Redis Action Config">>,
converter => fun ?MODULE:resource_opts_converter/2,
required => false required => false
} }
)}; )};
@ -118,26 +116,6 @@ desc(action_resource_opts) ->
desc(_Name) -> desc(_Name) ->
undefined. undefined.
resource_opts_converter(undefined, _Opts) ->
undefined;
resource_opts_converter(Conf, _Opts) ->
maps:map(
fun(_Name, SubConf) ->
case SubConf of
#{<<"parameters">> := #{<<"redis_type">> := <<"cluster">>}} ->
ResOpts = maps:get(<<"resource_opts">>, SubConf, #{}),
%% cluster don't support batch
SubConf#{
<<"resource_opts">> =>
ResOpts#{<<"batch_size">> => 1, <<"batch_time">> => <<"0ms">>}
};
_ ->
SubConf
end
end,
Conf
).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API %% `emqx_bridge_v2_schema' "unofficial" API
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
@ -147,7 +125,7 @@ bridge_v2_examples(Method) ->
#{ #{
<<"redis">> => #{ <<"redis">> => #{
summary => <<"Redis Action">>, summary => <<"Redis Action">>,
value => action_example(single, Method) value => action_example(Method)
} }
} }
]. ].
@ -177,17 +155,17 @@ connector_examples(Method) ->
conn_bridge_examples(Method) -> conn_bridge_examples(Method) ->
emqx_bridge_redis:conn_bridge_examples(Method). emqx_bridge_redis:conn_bridge_examples(Method).
action_example(RedisType, post) -> action_example(post) ->
maps:merge( maps:merge(
action_example(RedisType, put), action_example(put),
#{ #{
type => <<"redis">>, type => <<"redis">>,
name => <<"my_action">> name => <<"my_action">>
} }
); );
action_example(RedisType, get) -> action_example(get) ->
maps:merge( maps:merge(
action_example(RedisType, put), action_example(put),
#{ #{
status => <<"connected">>, status => <<"connected">>,
node_status => [ node_status => [
@ -198,14 +176,13 @@ action_example(RedisType, get) ->
] ]
} }
); );
action_example(RedisType, put) -> action_example(put) ->
#{ #{
enable => true, enable => true,
connector => <<"my_connector_name">>, connector => <<"my_connector_name">>,
description => <<"My action">>, description => <<"My action">>,
parameters => #{ parameters => #{
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>]
redis_type => RedisType
}, },
resource_opts => #{batch_size => 1} resource_opts => #{batch_size => 1}
}. }.
@ -236,20 +213,33 @@ connector_example(RedisType, put) ->
enable => true, enable => true,
description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>, description => <<"My redis ", (atom_to_binary(RedisType))/binary, " connector">>,
parameters => connector_parameter(RedisType), parameters => connector_parameter(RedisType),
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>,
ssl => #{enable => false} ssl => #{enable => false}
}. }.
connector_parameter(single) -> connector_parameter(single) ->
#{redis_type => single, server => <<"127.0.0.1:6379">>}; #{
redis_type => single,
server => <<"127.0.0.1:6379">>,
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>
};
connector_parameter(cluster) -> connector_parameter(cluster) ->
#{redis_type => cluster, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>}; #{
redis_type => cluster,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
pool_size => 8,
username => <<"test">>,
password => <<"******">>
};
connector_parameter(sentinel) -> connector_parameter(sentinel) ->
#{ #{
redis_type => sentinel, redis_type => sentinel,
servers => <<"127.0.0.1:6379,127.0.0.2:6379">>, servers => <<"127.0.0.1:6379,127.0.0.2:6379">>,
sentinel => <<"myredismaster">> sentinel => <<"myredismaster">>,
pool_size => 8,
database => 1,
username => <<"test">>,
password => <<"******">>
}. }.

View File

@ -239,6 +239,11 @@ load_config_from_raw(RawConf0, Opts) ->
RawConf = emqx_config:fill_defaults(RawConf1), RawConf = emqx_config:fill_defaults(RawConf1),
case check_config(RawConf) of case check_config(RawConf) of
ok -> ok ->
%% It has been ensured that the connector is always the first configuration to be updated.
%% However, when deleting the connector, we need to clean up the dependent actions first;
%% otherwise, the deletion will fail.
%% notice: we can't create a action before connector.
uninstall_actions(RawConf, Opts),
Error = Error =
lists:filtermap( lists:filtermap(
fun({K, V}) -> fun({K, V}) ->
@ -272,6 +277,29 @@ load_config_from_raw(RawConf0, Opts) ->
{error, Errors} {error, Errors}
end. end.
uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) ->
Old = emqx_conf:get_raw([<<"actions">>], #{}),
#{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
maps:foreach(
fun({Type, Name}, _) ->
case emqx_bridge_v2:remove(Type, Name) of
ok ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_remove_action",
type => Type,
name => Name,
error => Reason
})
end
end,
Removed
);
%% we don't delete things when in merge mode or without actions key.
uninstall_actions(_RawConf, _) ->
ok.
update_config_cluster( update_config_cluster(
?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, ?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key,
Conf, Conf,
@ -286,16 +314,9 @@ update_config_cluster(
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts); check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) -> update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf), Merged = merge_conf(Key, NewConf),
Request = make_request(Key, Merged), check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), NewConf, Opts);
update_config_cluster(Key, Value, #{mode := replace} = Opts) -> update_config_cluster(Key, Value, #{mode := replace} = Opts) ->
Request = make_request(Key, Value), check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Opts).
check_res(Key, emqx_conf:update([Key], Request, ?OPTIONS), Value, Opts).
make_request(Key, Value) when Key =:= <<"connectors">> orelse Key =:= <<"actions">> ->
{force_update, Value};
make_request(_Key, Value) ->
Value.
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
update_config_local( update_config_local(
@ -312,11 +333,9 @@ update_config_local(
check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts); check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Opts);
update_config_local(Key, NewConf, #{mode := merge} = Opts) -> update_config_local(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf), Merged = merge_conf(Key, NewConf),
Request = make_request(Key, Merged), check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Opts);
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), NewConf, Opts);
update_config_local(Key, Value, #{mode := replace} = Opts) -> update_config_local(Key, Value, #{mode := replace} = Opts) ->
Request = make_request(Key, Value), check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Opts).
check_res(node(), Key, emqx:update_config([Key], Request, ?LOCAL_OPTIONS), Value, Opts).
check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts). check_res(Key, Res, Conf, Opts) -> check_res(cluster, Key, Res, Conf, Opts).
check_res(Node, Key, {ok, _}, _Conf, Opts) -> check_res(Node, Key, {ok, _}, _Conf, Opts) ->
@ -442,6 +461,7 @@ filter_readonly_config(Raw) ->
end. end.
reload_config(AllConf, Opts) -> reload_config(AllConf, Opts) ->
uninstall_actions(AllConf, Opts),
Fold = fun({Key, Conf}, Acc) -> Fold = fun({Key, Conf}, Acc) ->
case update_config_local(Key, Conf, Opts) of case update_config_local(Key, Conf, Opts) of
ok -> ok ->

View File

@ -80,7 +80,6 @@ upgrade_raw_conf(Raw0) ->
Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0), Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1). emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1).
namespace() -> emqx. namespace() -> emqx.
tags() -> tags() ->

View File

@ -113,15 +113,13 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
ok -> {ok, convert_certs(NewConf)}; ok -> {ok, convert_certs(NewConf)};
Error -> Error Error -> Error
end; end;
pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when
when ?ENABLE_OR_DISABLE(Oper) -> ?ENABLE_OR_DISABLE(Oper)
{error, #{ ->
reason => <<"connector_not_found">>, {error, connector_not_found};
connector_name => Name, pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig) when
connector_type => Type ?ENABLE_OR_DISABLE(Oper)
}}; ->
pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig)
when ?ENABLE_OR_DISABLE(Oper) ->
%% to save the 'enable' to the config files %% to save the 'enable' to the config files
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) -> pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) ->