refactor: action schema retrival after PR feedback
This commit is contained in:
parent
ab078647a5
commit
5e8e407017
|
@ -1,169 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% This function is called to register a bridge V2. It should be called before
|
||||
%% the system boots in a function triggered by an -on_load() directive
|
||||
%% since it should be called before the system boots because the config
|
||||
%% system depends on that.
|
||||
%%
|
||||
%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2
|
||||
%% might not be loaded when the bridge module is loaded.
|
||||
-spec emqx_bridge_v2_register_bridge_type(#{
|
||||
%% Should be provided by all bridges. Even if the bridge_v2_type_name is
|
||||
%% the same as the bridge_v1_type_named.
|
||||
'bridge_v1_type_name' := atom(),
|
||||
'bridge_v2_type_name' := atom(),
|
||||
'connector_type' := atom(),
|
||||
'schema_module' := atom(),
|
||||
'schema_struct_field' := atom() | binary()
|
||||
}) -> ok.
|
||||
emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) ->
|
||||
try
|
||||
%% We must prevent overwriting so we take a lock when writing to persistent_term
|
||||
global:set_lock(
|
||||
{
|
||||
internal_emqx_bridge_v2_persistent_term_info_key(),
|
||||
internal_emqx_bridge_v2_persistent_term_info_key()
|
||||
},
|
||||
[node()],
|
||||
infinity
|
||||
),
|
||||
internal_maybe_create_initial_bridge_v2_info_map(),
|
||||
internal_register_bridge_type_with_lock(BridgeTypeInfo)
|
||||
catch
|
||||
ErrorType:Reason:Stacktrace ->
|
||||
%% Print the error on standard output as logger might not be
|
||||
%% started yet
|
||||
io:format("~p~n", [
|
||||
#{
|
||||
'error_type' => ErrorType,
|
||||
'reason' => Reason,
|
||||
'stacktrace' => Stacktrace,
|
||||
'msg' => "Failed to register bridge V2 type"
|
||||
}
|
||||
]),
|
||||
erlang:raise(ErrorType, Reason, Stacktrace)
|
||||
after
|
||||
global:del_lock(
|
||||
{
|
||||
internal_emqx_bridge_v2_persistent_term_info_key(),
|
||||
internal_emqx_bridge_v2_persistent_term_info_key()
|
||||
},
|
||||
[node()]
|
||||
)
|
||||
end,
|
||||
ok.
|
||||
|
||||
internal_register_bridge_type_with_lock(BridgeTypeInfo) ->
|
||||
InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
%% The Bridge V1 type is also a bridge V2 type due to backwards compatibility
|
||||
InfoMap1 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_names,
|
||||
maps:get(bridge_v1_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap0,
|
||||
true
|
||||
),
|
||||
InfoMap2 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_names,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap1,
|
||||
true
|
||||
),
|
||||
InfoMap3 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v1_type_to_bridge_v2_type,
|
||||
maps:get(bridge_v1_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap2,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
),
|
||||
%% Backwards compatibility
|
||||
InfoMap4 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v1_type_to_bridge_v2_type,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap3,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
),
|
||||
InfoMap5 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_to_connector_type,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap4,
|
||||
maps:get(connector_type, BridgeTypeInfo)
|
||||
),
|
||||
%% Backwards compatibility
|
||||
InfoMap6 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_to_connector_type,
|
||||
maps:get(bridge_v1_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap5,
|
||||
maps:get(connector_type, BridgeTypeInfo)
|
||||
),
|
||||
InfoMap7 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_to_schema_module,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap6,
|
||||
maps:get(schema_module, BridgeTypeInfo)
|
||||
),
|
||||
InfoMap8 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_to_schema_struct_field,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap7,
|
||||
maps:get(schema_struct_field, BridgeTypeInfo)
|
||||
),
|
||||
InfoMap9 = emqx_utils_maps:deep_force_put(
|
||||
[
|
||||
bridge_v2_type_to_bridge_v1_type,
|
||||
maps:get(bridge_v2_type_name, BridgeTypeInfo)
|
||||
],
|
||||
InfoMap8,
|
||||
maps:get(bridge_v1_type_name, BridgeTypeInfo)
|
||||
),
|
||||
ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9).
|
||||
|
||||
internal_maybe_create_initial_bridge_v2_info_map() ->
|
||||
case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of
|
||||
undefined ->
|
||||
ok = persistent_term:put(
|
||||
internal_emqx_bridge_v2_persistent_term_info_key(),
|
||||
#{
|
||||
bridge_v2_type_names => #{},
|
||||
bridge_v1_type_to_bridge_v2_type => #{},
|
||||
bridge_v2_type_to_bridge_v1_type => #{},
|
||||
bridge_v2_type_to_connector_type => #{},
|
||||
bridge_v2_type_to_schema_module => #{},
|
||||
bridge_v2_type_to_schema_struct_field => #{}
|
||||
}
|
||||
),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
internal_emqx_bridge_v2_persistent_term_info_key() ->
|
||||
?FUNCTION_NAME.
|
|
@ -0,0 +1,225 @@
|
|||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc The module which knows everything about actions.
|
||||
|
||||
%% NOTE: it does not cover the V1 bridges.
|
||||
|
||||
-module(emqx_action_info).
|
||||
|
||||
-export([
|
||||
action_type_to_connector_type/1,
|
||||
action_type_to_bridge_v1_type/1,
|
||||
bridge_v1_type_to_action_type/1,
|
||||
is_action_type/1,
|
||||
registered_schema_modules/0
|
||||
]).
|
||||
|
||||
-callback bridge_v1_type_name() -> atom().
|
||||
-callback action_type_name() -> atom().
|
||||
-callback connector_type_name() -> atom().
|
||||
-callback schema_module() -> atom().
|
||||
|
||||
-optional_callbacks([bridge_v1_type_name/0]).
|
||||
|
||||
%% ====================================================================
|
||||
%% Hadcoded list of info modules for actions
|
||||
%% TODO: Remove this list once we have made sure that all relevants
|
||||
%% apps are loaded before this module is called.
|
||||
%% ====================================================================
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
hard_coded_action_info_modules_ee() ->
|
||||
[emqx_bridge_kafka_action_info].
|
||||
-else.
|
||||
hard_coded_action_info_modules_ee() ->
|
||||
[].
|
||||
-endif.
|
||||
|
||||
hard_coded_action_info_modules_common() ->
|
||||
[].
|
||||
|
||||
hard_coded_action_info_modules() ->
|
||||
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
|
||||
|
||||
%% ====================================================================
|
||||
%% API
|
||||
%% ====================================================================
|
||||
|
||||
action_type_to_connector_type(Type) when not is_atom(Type) ->
|
||||
action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
|
||||
action_type_to_connector_type(Type) ->
|
||||
ActionInfoMap = info_map(),
|
||||
ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap),
|
||||
case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of
|
||||
undefined -> action_type_to_connector_type_old(Type);
|
||||
ConnectorType -> ConnectorType
|
||||
end.
|
||||
|
||||
% action_type_to_connector_type_old(kafka) ->
|
||||
% %% backward compatible
|
||||
% kafka_producer;
|
||||
% action_type_to_connector_type_old(kafka_producer) ->
|
||||
% kafka_producer;
|
||||
action_type_to_connector_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer.
|
||||
|
||||
bridge_v1_type_to_action_type(Bin) when is_binary(Bin) ->
|
||||
bridge_v1_type_to_action_type(binary_to_existing_atom(Bin));
|
||||
bridge_v1_type_to_action_type(Type) ->
|
||||
ActionInfoMap = info_map(),
|
||||
BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap),
|
||||
case maps:get(Type, BridgeV1TypeToActionType, undefined) of
|
||||
undefined -> bridge_v1_type_to_action_type_old(Type);
|
||||
ActionType -> ActionType
|
||||
end.
|
||||
|
||||
% bridge_v1_type_to_action_type_old(kafka) ->
|
||||
% kafka_producer;
|
||||
% bridge_v1_type_to_action_type_old(kafka_producer) ->
|
||||
% kafka_producer;
|
||||
bridge_v1_type_to_action_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer;
|
||||
bridge_v1_type_to_action_type_old(Type) ->
|
||||
Type.
|
||||
|
||||
action_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
|
||||
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
|
||||
action_type_to_bridge_v1_type(Type) ->
|
||||
ActionInfoMap = info_map(),
|
||||
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
|
||||
case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of
|
||||
undefined -> action_type_to_bridge_v1_type_old(Type);
|
||||
BridgeV1Type -> BridgeV1Type
|
||||
end.
|
||||
|
||||
action_type_to_bridge_v1_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer;
|
||||
action_type_to_bridge_v1_type_old(Type) ->
|
||||
Type.
|
||||
|
||||
%% This function should return true for all inputs that are bridge V1 types for
|
||||
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
||||
%% types. For everything else the function should return false.
|
||||
is_action_type(Bin) when is_binary(Bin) ->
|
||||
is_action_type(binary_to_existing_atom(Bin));
|
||||
is_action_type(Type) ->
|
||||
ActionInfoMap = info_map(),
|
||||
ActionTypes = maps:get(action_type_names, ActionInfoMap),
|
||||
case maps:get(Type, ActionTypes, undefined) of
|
||||
undefined -> is_action_type_old(Type);
|
||||
_ -> true
|
||||
end.
|
||||
|
||||
% is_action_type_old(kafka_producer) ->
|
||||
% true;
|
||||
% is_action_type_old(kafka) ->
|
||||
% true;
|
||||
is_action_type_old(azure_event_hub_producer) ->
|
||||
true;
|
||||
is_action_type_old(_) ->
|
||||
false.
|
||||
|
||||
registered_schema_modules() ->
|
||||
InfoMap = info_map(),
|
||||
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
||||
maps:to_list(Schemas).
|
||||
|
||||
%% ====================================================================
|
||||
%% Internal functions for building the info map and accessing it
|
||||
%% ====================================================================
|
||||
|
||||
internal_emqx_action_persistent_term_info_key() ->
|
||||
?FUNCTION_NAME.
|
||||
|
||||
info_map() ->
|
||||
case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of
|
||||
not_found ->
|
||||
build_cache();
|
||||
ActionInfoMap ->
|
||||
ActionInfoMap
|
||||
end.
|
||||
|
||||
build_cache() ->
|
||||
ActionInfoModules = action_info_modules(),
|
||||
ActionInfoMap =
|
||||
lists:foldl(
|
||||
fun(Module, InfoMapSoFar) ->
|
||||
ModuleInfoMap = get_info_map(Module),
|
||||
emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap)
|
||||
end,
|
||||
initial_info_map(),
|
||||
ActionInfoModules
|
||||
),
|
||||
%% Update the persistent term with the new info map
|
||||
persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
|
||||
ActionInfoMap.
|
||||
|
||||
action_info_modules() ->
|
||||
ActionInfoModules = [
|
||||
action_info_modules(App)
|
||||
|| {App, _, _} <- application:loaded_applications()
|
||||
],
|
||||
lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()).
|
||||
|
||||
action_info_modules(App) ->
|
||||
case application:get_env(App, emqx, action_info_module) of
|
||||
{ok, Module} ->
|
||||
[Module];
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
initial_info_map() ->
|
||||
#{
|
||||
action_type_names => #{},
|
||||
bridge_v1_type_to_action_type => #{},
|
||||
action_type_to_bridge_v1_type => #{},
|
||||
action_type_to_connector_type => #{},
|
||||
action_type_to_schema_module => #{}
|
||||
}.
|
||||
|
||||
get_info_map(Module) ->
|
||||
%% Force the module to get loaded
|
||||
_ = code:ensure_loaded(Module),
|
||||
ActionType = Module:action_type_name(),
|
||||
BridgeV1Type =
|
||||
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
|
||||
true ->
|
||||
Module:bridge_v1_type_name();
|
||||
false ->
|
||||
Module:action_type_name()
|
||||
end,
|
||||
#{
|
||||
action_type_names => #{
|
||||
ActionType => true,
|
||||
BridgeV1Type => true
|
||||
},
|
||||
bridge_v1_type_to_action_type => #{
|
||||
BridgeV1Type => ActionType,
|
||||
%% Alias the bridge V1 type to the action type
|
||||
ActionType => ActionType
|
||||
},
|
||||
action_type_to_bridge_v1_type => #{
|
||||
ActionType => BridgeV1Type
|
||||
},
|
||||
action_type_to_connector_type => #{
|
||||
ActionType => Module:connector_type_name(),
|
||||
%% Alias the bridge V1 type to the action type
|
||||
BridgeV1Type => Module:connector_type_name()
|
||||
},
|
||||
action_type_to_schema_module => #{
|
||||
ActionType => Module:schema_module()
|
||||
}
|
||||
}.
|
|
@ -18,15 +18,6 @@
|
|||
-behaviour(emqx_config_handler).
|
||||
-behaviour(emqx_config_backup).
|
||||
|
||||
-compile([
|
||||
{nowarn_unused_function, [
|
||||
{internal_register_bridge_type_with_lock, 1},
|
||||
{emqx_bridge_v2_register_bridge_type, 1}
|
||||
]}
|
||||
]).
|
||||
-ignore_xref(emqx_bridge_v2_register_bridge_type/1).
|
||||
-ignore_xref(internal_register_bridge_type_with_lock/1).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
@ -37,15 +28,6 @@
|
|||
%% refactored into a new module/application with appropriate name.
|
||||
-define(ROOT_KEY, actions).
|
||||
|
||||
-on_load(on_load/0).
|
||||
|
||||
%% Getting registered bridge schemas:
|
||||
|
||||
-export([
|
||||
bridge_v2_type_to_schame_struct_field/1,
|
||||
registered_schema_modules/0
|
||||
]).
|
||||
|
||||
%% Loading and unloading config when EMQX starts and stops
|
||||
-export([
|
||||
load/0,
|
||||
|
@ -158,53 +140,6 @@
|
|||
|
||||
%%====================================================================
|
||||
|
||||
-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl").
|
||||
|
||||
on_load() ->
|
||||
try
|
||||
%% We must prevent overwriting so we take a lock when writing to persistent_term
|
||||
global:set_lock(
|
||||
{
|
||||
internal_emqx_bridge_v2_persistent_term_info_key(),
|
||||
internal_emqx_bridge_v2_persistent_term_info_key()
|
||||
},
|
||||
[node()],
|
||||
infinity
|
||||
),
|
||||
internal_maybe_create_initial_bridge_v2_info_map()
|
||||
catch
|
||||
ErrorType:Reason:Stacktrace ->
|
||||
%% Logger may not be started so print to stdout
|
||||
io:format("~p~n", [
|
||||
#{
|
||||
'error_type' => ErrorType,
|
||||
'reason' => Reason,
|
||||
'stacktrace' => Stacktrace,
|
||||
'msg' => "Failed to register bridge V2 type"
|
||||
}
|
||||
]),
|
||||
erlang:raise(ErrorType, Reason, Stacktrace)
|
||||
after
|
||||
global:del_lock(
|
||||
{
|
||||
internal_emqx_bridge_v2_persistent_term_info_key(),
|
||||
internal_emqx_bridge_v2_persistent_term_info_key()
|
||||
},
|
||||
[node()]
|
||||
)
|
||||
end,
|
||||
ok.
|
||||
|
||||
bridge_v2_type_to_schame_struct_field(BridgeV2Type) ->
|
||||
InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap),
|
||||
maps:get(BridgeV2Type, Map).
|
||||
|
||||
registered_schema_modules() ->
|
||||
InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap),
|
||||
maps:to_list(Schemas).
|
||||
|
||||
%%====================================================================
|
||||
%% Loading and unloading config when EMQX starts and stops
|
||||
%%====================================================================
|
||||
|
@ -891,23 +826,8 @@ connector_type(Type) ->
|
|||
%% remote call so it can be mocked
|
||||
?MODULE:bridge_v2_type_to_connector_type(Type).
|
||||
|
||||
bridge_v2_type_to_connector_type(Type) when not is_atom(Type) ->
|
||||
bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
|
||||
bridge_v2_type_to_connector_type(Type) ->
|
||||
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap),
|
||||
case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of
|
||||
undefined -> bridge_v2_type_to_connector_type_old(Type);
|
||||
ConnectorType -> ConnectorType
|
||||
end.
|
||||
|
||||
% bridge_v2_type_to_connector_type_old(kafka) ->
|
||||
% %% backward compatible
|
||||
% kafka_producer;
|
||||
% bridge_v2_type_to_connector_type_old(kafka_producer) ->
|
||||
% kafka_producer;
|
||||
bridge_v2_type_to_connector_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer.
|
||||
emqx_action_info:action_type_to_connector_type(Type).
|
||||
|
||||
%%====================================================================
|
||||
%% Data backup API
|
||||
|
@ -1127,61 +1047,14 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
|
|||
end
|
||||
end.
|
||||
|
||||
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
|
||||
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
|
||||
bridge_v1_type_to_bridge_v2_type(Type) ->
|
||||
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap),
|
||||
case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of
|
||||
undefined -> bridge_v1_type_to_bridge_v2_type_old(Type);
|
||||
BridgeV2Type -> BridgeV2Type
|
||||
end.
|
||||
emqx_action_info:bridge_v1_type_to_action_type(Type).
|
||||
|
||||
% bridge_v1_type_to_bridge_v2_type_old(kafka) ->
|
||||
% kafka_producer;
|
||||
% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) ->
|
||||
% kafka_producer;
|
||||
bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer;
|
||||
bridge_v1_type_to_bridge_v2_type_old(Type) ->
|
||||
Type.
|
||||
|
||||
bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
|
||||
?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
|
||||
bridge_v2_type_to_bridge_v1_type(Type) ->
|
||||
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap),
|
||||
case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of
|
||||
undefined -> bridge_v2_type_to_bridge_v1_type_old(Type);
|
||||
BridgeV1Type -> BridgeV1Type
|
||||
end.
|
||||
emqx_action_info:action_type_to_bridge_v1_type(Type).
|
||||
|
||||
bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) ->
|
||||
azure_event_hub_producer;
|
||||
bridge_v2_type_to_bridge_v1_type_old(Type) ->
|
||||
Type.
|
||||
|
||||
%% This function should return true for all inputs that are bridge V1 types for
|
||||
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
||||
%% types. For everything else the function should return false.
|
||||
is_bridge_v2_type(Bin) when is_binary(Bin) ->
|
||||
is_bridge_v2_type(binary_to_existing_atom(Bin));
|
||||
is_bridge_v2_type(Type) ->
|
||||
BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()),
|
||||
BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap),
|
||||
case maps:get(Type, BridgeV2Types, undefined) of
|
||||
undefined -> is_bridge_v2_type_old(Type);
|
||||
_ -> true
|
||||
end.
|
||||
|
||||
% is_bridge_v2_type_old(kafka_producer) ->
|
||||
% true;
|
||||
% is_bridge_v2_type_old(kafka) ->
|
||||
% true;
|
||||
is_bridge_v2_type_old(azure_event_hub_producer) ->
|
||||
true;
|
||||
is_bridge_v2_type_old(_) ->
|
||||
false.
|
||||
emqx_action_info:is_action_type(Type).
|
||||
|
||||
bridge_v1_list_and_transform() ->
|
||||
Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2),
|
||||
|
|
|
@ -93,7 +93,7 @@ registered_api_schemas(Method) ->
|
|||
%% We *must* do this to ensure the module is really loaded, especially when we use
|
||||
%% `call_hocon' from `nodetool' to generate initial configurations.
|
||||
_ = emqx_bridge_v2:module_info(),
|
||||
RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(),
|
||||
RegistredSchmeas = emqx_action_info:registered_schema_modules(),
|
||||
[
|
||||
api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
|
||||
|| {BridgeV2Type, SchemaModule} <- RegistredSchmeas
|
||||
|
@ -156,8 +156,8 @@ fields(actions) ->
|
|||
|
||||
registered_schema_fields() ->
|
||||
[
|
||||
Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_struct_field(BridgeV2Type))
|
||||
|| {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules()
|
||||
Module:fields(action)
|
||||
|| {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules()
|
||||
].
|
||||
|
||||
desc(actions) ->
|
||||
|
@ -183,7 +183,7 @@ examples(Method) ->
|
|||
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
|
||||
lists:foldl(MergeFun, Examples, ConnectorExamples)
|
||||
end,
|
||||
SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()],
|
||||
SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
|
||||
lists:foldl(Fun, #{}, SchemaModules).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
brod,
|
||||
brod_gssapi
|
||||
]},
|
||||
{env, []},
|
||||
{env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]},
|
||||
{modules, []},
|
||||
|
||||
{links, []}
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
]).
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-on_load(register_bridge_v2/0).
|
||||
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
conn_bridge_examples/1,
|
||||
|
@ -537,7 +535,9 @@ fields(bridge_v2_field) ->
|
|||
desc => <<"Kafka Producer Bridge V2 Config">>,
|
||||
required => false
|
||||
}
|
||||
)}.
|
||||
)};
|
||||
fields(action) ->
|
||||
fields(bridge_v2_field).
|
||||
|
||||
desc("config_connector") ->
|
||||
?DESC("desc_config");
|
||||
|
@ -725,17 +725,3 @@ kafka_ext_header_value_validator(Value) ->
|
|||
"placeholder like ${foo}, or a simple string."
|
||||
}
|
||||
end.
|
||||
|
||||
-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl").
|
||||
|
||||
register_bridge_v2() ->
|
||||
emqx_bridge_v2_register_bridge_type(#{
|
||||
%% Should be provided by all bridges. Even if the bridge_v2_type_name is
|
||||
%% the same as the bridge_v1_type_named.
|
||||
'bridge_v1_type_name' => kafka,
|
||||
'bridge_v2_type_name' => kafka_producer,
|
||||
'connector_type' => kafka_producer,
|
||||
'schema_module' => ?MODULE,
|
||||
'schema_struct_field' => bridge_v2_field
|
||||
}),
|
||||
ok.
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
-module(emqx_bridge_kafka_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-export([
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> kafka.
|
||||
|
||||
action_type_name() -> kafka_producer.
|
||||
|
||||
connector_type_name() -> kafka_producer.
|
||||
|
||||
schema_module() -> emqx_bridge_kafka.
|
|
@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) ->
|
|||
%% @doc Called from build script.
|
||||
%% TODO: move to a external escript after all refactoring is done
|
||||
dump_schema(Dir, SchemaModule) ->
|
||||
%% TODO: Load all apps instead of only emqx_dashboard
|
||||
%% as this will help schemas that searches for apps with
|
||||
%% relevant schema definitions
|
||||
_ = application:load(emqx_dashboard),
|
||||
ok = emqx_dashboard_desc_cache:init(),
|
||||
lists:foreach(
|
||||
|
|
Loading…
Reference in New Issue