Merge pull request #11967 from sstrigler/bridge-v1-type-fun
feat(emqx_bridge): action_info with dynamic lookup
This commit is contained in:
commit
d99f033d74
|
@ -22,7 +22,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
action_type_to_connector_type/1,
|
action_type_to_connector_type/1,
|
||||||
action_type_to_bridge_v1_type/1,
|
action_type_to_bridge_v1_type/2,
|
||||||
bridge_v1_type_to_action_type/1,
|
bridge_v1_type_to_action_type/1,
|
||||||
is_action_type/1,
|
is_action_type/1,
|
||||||
registered_schema_modules/0,
|
registered_schema_modules/0,
|
||||||
|
@ -30,7 +30,12 @@
|
||||||
bridge_v1_to_action_fixup/2
|
bridge_v1_to_action_fixup/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback bridge_v1_type_name() -> atom().
|
-callback bridge_v1_type_name() ->
|
||||||
|
atom()
|
||||||
|
| {
|
||||||
|
fun(({ActionConfig :: map(), ConnectorConfig :: map()}) -> Type :: atom()),
|
||||||
|
TypeList :: [atom()]
|
||||||
|
}.
|
||||||
-callback action_type_name() -> atom().
|
-callback action_type_name() -> atom().
|
||||||
-callback connector_type_name() -> atom().
|
-callback connector_type_name() -> atom().
|
||||||
-callback schema_module() -> atom().
|
-callback schema_module() -> atom().
|
||||||
|
@ -93,16 +98,22 @@ bridge_v1_type_to_action_type(Type) ->
|
||||||
ActionType -> ActionType
|
ActionType -> ActionType
|
||||||
end.
|
end.
|
||||||
|
|
||||||
action_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
|
action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
|
||||||
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
|
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
|
||||||
action_type_to_bridge_v1_type(Type) ->
|
action_type_to_bridge_v1_type(ActionType, Conf) ->
|
||||||
ActionInfoMap = info_map(),
|
ActionInfoMap = info_map(),
|
||||||
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
|
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
|
||||||
case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of
|
case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
|
||||||
undefined -> Type;
|
undefined -> ActionType;
|
||||||
|
BridgeV1TypeFun when is_function(BridgeV1TypeFun) -> BridgeV1TypeFun(get_confs(Conf));
|
||||||
BridgeV1Type -> BridgeV1Type
|
BridgeV1Type -> BridgeV1Type
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_confs(#{connector := ConnectorName, type := ActionType} = ActionConfig) ->
|
||||||
|
ConnectorType = action_type_to_connector_type(ActionType),
|
||||||
|
ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
|
||||||
|
{ActionConfig, ConnectorConfig}.
|
||||||
|
|
||||||
%% This function should return true for all inputs that are bridge V1 types for
|
%% This function should return true for all inputs that are bridge V1 types for
|
||||||
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
||||||
%% types. For everything else the function should return false.
|
%% types. For everything else the function should return false.
|
||||||
|
@ -226,36 +237,56 @@ get_info_map(Module) ->
|
||||||
%% Force the module to get loaded
|
%% Force the module to get loaded
|
||||||
_ = code:ensure_loaded(Module),
|
_ = code:ensure_loaded(Module),
|
||||||
ActionType = Module:action_type_name(),
|
ActionType = Module:action_type_name(),
|
||||||
BridgeV1Type =
|
{BridgeV1TypeOrFun, BridgeV1Types} =
|
||||||
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
|
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
|
||||||
true ->
|
true ->
|
||||||
Module:bridge_v1_type_name();
|
case Module:bridge_v1_type_name() of
|
||||||
|
{_BridgeV1TypeFun, _BridgeV1Types} = BridgeV1TypeTuple ->
|
||||||
|
BridgeV1TypeTuple;
|
||||||
|
BridgeV1Type0 ->
|
||||||
|
{BridgeV1Type0, [BridgeV1Type0]}
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
Module:action_type_name()
|
{ActionType, [ActionType]}
|
||||||
end,
|
end,
|
||||||
#{
|
#{
|
||||||
action_type_names => #{
|
action_type_names =>
|
||||||
ActionType => true,
|
lists:foldl(
|
||||||
BridgeV1Type => true
|
fun(BridgeV1Type, M) ->
|
||||||
},
|
M#{BridgeV1Type => true}
|
||||||
bridge_v1_type_to_action_type => #{
|
end,
|
||||||
BridgeV1Type => ActionType,
|
#{ActionType => true},
|
||||||
%% Alias the bridge V1 type to the action type
|
BridgeV1Types
|
||||||
ActionType => ActionType
|
),
|
||||||
},
|
bridge_v1_type_to_action_type =>
|
||||||
|
lists:foldl(
|
||||||
|
fun(BridgeV1Type, M) ->
|
||||||
|
%% Alias the bridge V1 type to the action type
|
||||||
|
M#{BridgeV1Type => ActionType}
|
||||||
|
end,
|
||||||
|
#{ActionType => ActionType},
|
||||||
|
BridgeV1Types
|
||||||
|
),
|
||||||
action_type_to_bridge_v1_type => #{
|
action_type_to_bridge_v1_type => #{
|
||||||
ActionType => BridgeV1Type
|
ActionType => BridgeV1TypeOrFun
|
||||||
},
|
|
||||||
action_type_to_connector_type => #{
|
|
||||||
ActionType => Module:connector_type_name(),
|
|
||||||
%% Alias the bridge V1 type to the action type
|
|
||||||
BridgeV1Type => Module:connector_type_name()
|
|
||||||
},
|
},
|
||||||
|
action_type_to_connector_type =>
|
||||||
|
lists:foldl(
|
||||||
|
fun(BridgeV1Type, M) ->
|
||||||
|
M#{BridgeV1Type => Module:connector_type_name()}
|
||||||
|
end,
|
||||||
|
#{ActionType => Module:connector_type_name()},
|
||||||
|
BridgeV1Types
|
||||||
|
),
|
||||||
action_type_to_schema_module => #{
|
action_type_to_schema_module => #{
|
||||||
ActionType => Module:schema_module()
|
ActionType => Module:schema_module()
|
||||||
},
|
},
|
||||||
action_type_to_info_module => #{
|
action_type_to_info_module =>
|
||||||
ActionType => Module,
|
lists:foldl(
|
||||||
BridgeV1Type => Module
|
fun(BridgeV1Type, M) ->
|
||||||
}
|
M#{BridgeV1Type => Module}
|
||||||
|
end,
|
||||||
|
#{ActionType => Module},
|
||||||
|
BridgeV1Types
|
||||||
|
)
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -907,7 +907,7 @@ format_resource(
|
||||||
redact(
|
redact(
|
||||||
maps:merge(
|
maps:merge(
|
||||||
RawConfFull#{
|
RawConfFull#{
|
||||||
type => downgrade_type(Type),
|
type => downgrade_type(Type, RawConf),
|
||||||
name => maps:get(<<"name">>, RawConf, BridgeName),
|
name => maps:get(<<"name">>, RawConf, BridgeName),
|
||||||
node => Node
|
node => Node
|
||||||
},
|
},
|
||||||
|
@ -1162,5 +1162,5 @@ non_compat_bridge_msg() ->
|
||||||
upgrade_type(Type) ->
|
upgrade_type(Type) ->
|
||||||
emqx_bridge_lib:upgrade_type(Type).
|
emqx_bridge_lib:upgrade_type(Type).
|
||||||
|
|
||||||
downgrade_type(Type) ->
|
downgrade_type(Type, Conf) ->
|
||||||
emqx_bridge_lib:downgrade_type(Type).
|
emqx_bridge_lib:downgrade_type(Type, Conf).
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
-export([
|
-export([
|
||||||
maybe_withdraw_rule_action/3,
|
maybe_withdraw_rule_action/3,
|
||||||
upgrade_type/1,
|
upgrade_type/1,
|
||||||
downgrade_type/1
|
downgrade_type/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% @doc A bridge can be used as a rule action.
|
%% @doc A bridge can be used as a rule action.
|
||||||
|
@ -61,17 +61,17 @@ upgrade_type(Type) when is_list(Type) ->
|
||||||
atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))).
|
atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))).
|
||||||
|
|
||||||
%% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
|
%% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
|
||||||
downgrade_type(Type) when is_atom(Type) ->
|
downgrade_type(Type, Conf) when is_atom(Type) ->
|
||||||
emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type);
|
emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf);
|
||||||
downgrade_type(Type) when is_binary(Type) ->
|
downgrade_type(Type, Conf) when is_binary(Type) ->
|
||||||
atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type));
|
atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf));
|
||||||
downgrade_type(Type) when is_list(Type) ->
|
downgrade_type(Type, Conf) when is_list(Type) ->
|
||||||
atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))).
|
atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type), Conf)).
|
||||||
|
|
||||||
%% A rule might be referencing an old version bridge type name
|
%% A rule might be referencing an old version bridge type name
|
||||||
%% i.e. 'kafka' instead of 'kafka_producer' so we need to try both
|
%% i.e. 'kafka' instead of 'kafka_producer' so we need to try both
|
||||||
external_ids(Type, Name) ->
|
external_ids(Type, Name) ->
|
||||||
case downgrade_type(Type) of
|
case downgrade_type(Type, get_conf(Type, Name)) of
|
||||||
Type ->
|
Type ->
|
||||||
[external_id(Type, Name)];
|
[external_id(Type, Name)];
|
||||||
Type0 ->
|
Type0 ->
|
||||||
|
@ -87,3 +87,9 @@ external_id(BridgeType, BridgeName) ->
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
||||||
|
get_conf(BridgeType, BridgeName) ->
|
||||||
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
||||||
|
true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]);
|
||||||
|
false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName])
|
||||||
|
end.
|
||||||
|
|
|
@ -111,7 +111,7 @@
|
||||||
bridge_v1_create_dry_run/2,
|
bridge_v1_create_dry_run/2,
|
||||||
bridge_v1_type_to_bridge_v2_type/1,
|
bridge_v1_type_to_bridge_v2_type/1,
|
||||||
%% Exception from the naming convention:
|
%% Exception from the naming convention:
|
||||||
bridge_v2_type_to_bridge_v1_type/1,
|
bridge_v2_type_to_bridge_v1_type/2,
|
||||||
bridge_v1_id_to_connector_resource_id/1,
|
bridge_v1_id_to_connector_resource_id/1,
|
||||||
bridge_v1_enable_disable/3,
|
bridge_v1_enable_disable/3,
|
||||||
bridge_v1_restart/2,
|
bridge_v1_restart/2,
|
||||||
|
@ -1050,8 +1050,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
|
||||||
bridge_v1_type_to_bridge_v2_type(Type) ->
|
bridge_v1_type_to_bridge_v2_type(Type) ->
|
||||||
emqx_action_info:bridge_v1_type_to_action_type(Type).
|
emqx_action_info:bridge_v1_type_to_action_type(Type).
|
||||||
|
|
||||||
bridge_v2_type_to_bridge_v1_type(Type) ->
|
bridge_v2_type_to_bridge_v1_type(Type, Conf) ->
|
||||||
emqx_action_info:action_type_to_bridge_v1_type(Type).
|
emqx_action_info:action_type_to_bridge_v1_type(Type, Conf).
|
||||||
|
|
||||||
is_bridge_v2_type(Type) ->
|
is_bridge_v2_type(Type) ->
|
||||||
emqx_action_info:is_action_type(Type).
|
emqx_action_info:is_action_type(Type).
|
||||||
|
|
|
@ -621,8 +621,13 @@ partitioner(random) -> random;
|
||||||
partitioner(key_dispatch) -> first_key_dispatch.
|
partitioner(key_dispatch) -> first_key_dispatch.
|
||||||
|
|
||||||
replayq_dir(BridgeType, BridgeName) ->
|
replayq_dir(BridgeType, BridgeName) ->
|
||||||
|
RawConf = emqx_conf:get_raw([actions, BridgeType, BridgeName]),
|
||||||
DirName = iolist_to_binary([
|
DirName = iolist_to_binary([
|
||||||
emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node())
|
emqx_bridge_lib:downgrade_type(BridgeType, RawConf),
|
||||||
|
":",
|
||||||
|
BridgeName,
|
||||||
|
":",
|
||||||
|
atom_to_list(node())
|
||||||
]),
|
]),
|
||||||
filename:join([emqx:data_dir(), "kafka", DirName]).
|
filename:join([emqx:data_dir(), "kafka", DirName]).
|
||||||
|
|
||||||
|
|
|
@ -521,8 +521,9 @@ format_action(Actions) ->
|
||||||
|
|
||||||
do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
|
do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
|
||||||
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
||||||
do_format_action({bridge_v2, BridgeType, BridgeName}) ->
|
do_format_action({bridge_v2, BridgeType0, BridgeName}) ->
|
||||||
emqx_bridge_resource:bridge_id(emqx_bridge_lib:downgrade_type(BridgeType), BridgeName);
|
BridgeType = try_downgrade(BridgeType0, BridgeName),
|
||||||
|
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
||||||
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
|
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
|
||||||
#{
|
#{
|
||||||
function => printable_function_name(Mod, Func),
|
function => printable_function_name(Mod, Func),
|
||||||
|
@ -533,6 +534,25 @@ do_format_action(#{mod := Mod, func := Func}) ->
|
||||||
function => printable_function_name(Mod, Func)
|
function => printable_function_name(Mod, Func)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
try_downgrade(BridgeType, BridgeName) ->
|
||||||
|
Conf = try_get_conf(BridgeType, BridgeName),
|
||||||
|
try emqx_bridge_lib:downgrade_type(BridgeType, Conf) of
|
||||||
|
DowngradedBridgeType ->
|
||||||
|
DowngradedBridgeType
|
||||||
|
catch
|
||||||
|
error:{config_not_found, _} ->
|
||||||
|
BridgeType
|
||||||
|
end.
|
||||||
|
|
||||||
|
try_get_conf(BridgeType, BridgeName) ->
|
||||||
|
try emqx_conf:get_raw([actions, BridgeType, BridgeName]) of
|
||||||
|
RawConf ->
|
||||||
|
RawConf
|
||||||
|
catch
|
||||||
|
error:{config_not_found, _} ->
|
||||||
|
#{}
|
||||||
|
end.
|
||||||
|
|
||||||
printable_function_name(emqx_rule_actions, Func) ->
|
printable_function_name(emqx_rule_actions, Func) ->
|
||||||
Func;
|
Func;
|
||||||
printable_function_name(Mod, Func) ->
|
printable_function_name(Mod, Func) ->
|
||||||
|
|
Loading…
Reference in New Issue