Merge pull request #11945 from kjellwinblad/kjell/shared_con/better_names_and_specs

emqx_bridge_v2 module improvements
This commit is contained in:
Ivan Dyachkov 2023-11-14 09:56:17 +01:00 committed by GitHub
commit aded4a57b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 474 additions and 238 deletions

View File

@ -0,0 +1,199 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The module which knows everything about actions.
%% NOTE: it does not cover the V1 bridges.
-module(emqx_action_info).
-export([
action_type_to_connector_type/1,
action_type_to_bridge_v1_type/1,
bridge_v1_type_to_action_type/1,
is_action_type/1,
registered_schema_modules/0
]).
-callback bridge_v1_type_name() -> atom().
-callback action_type_name() -> atom().
-callback connector_type_name() -> atom().
-callback schema_module() -> atom().
-optional_callbacks([bridge_v1_type_name/0]).
%% ====================================================================
%% Hadcoded list of info modules for actions
%% TODO: Remove this list once we have made sure that all relevants
%% apps are loaded before this module is called.
%% ====================================================================
-if(?EMQX_RELEASE_EDITION == ee).
hard_coded_action_info_modules_ee() ->
[
emqx_bridge_kafka_action_info,
emqx_bridge_azure_event_hub_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
[].
-endif.
hard_coded_action_info_modules_common() ->
[].
hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
%% ====================================================================
%% API
%% ====================================================================
action_type_to_connector_type(Type) when not is_atom(Type) ->
action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
action_type_to_connector_type(Type) ->
ActionInfoMap = info_map(),
ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap),
case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of
undefined -> Type;
ConnectorType -> ConnectorType
end.
bridge_v1_type_to_action_type(Bin) when is_binary(Bin) ->
bridge_v1_type_to_action_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_action_type(Type) ->
ActionInfoMap = info_map(),
BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap),
case maps:get(Type, BridgeV1TypeToActionType, undefined) of
undefined -> Type;
ActionType -> ActionType
end.
action_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
action_type_to_bridge_v1_type(Type) ->
ActionInfoMap = info_map(),
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of
undefined -> Type;
BridgeV1Type -> BridgeV1Type
end.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_action_type(Bin) when is_binary(Bin) ->
is_action_type(binary_to_existing_atom(Bin));
is_action_type(Type) ->
ActionInfoMap = info_map(),
ActionTypes = maps:get(action_type_names, ActionInfoMap),
case maps:get(Type, ActionTypes, undefined) of
undefined -> false;
_ -> true
end.
registered_schema_modules() ->
InfoMap = info_map(),
Schemas = maps:get(action_type_to_schema_module, InfoMap),
maps:to_list(Schemas).
%% ====================================================================
%% Internal functions for building the info map and accessing it
%% ====================================================================
internal_emqx_action_persistent_term_info_key() ->
?FUNCTION_NAME.
info_map() ->
case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of
not_found ->
build_cache();
ActionInfoMap ->
ActionInfoMap
end.
build_cache() ->
ActionInfoModules = action_info_modules(),
ActionInfoMap =
lists:foldl(
fun(Module, InfoMapSoFar) ->
ModuleInfoMap = get_info_map(Module),
emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap)
end,
initial_info_map(),
ActionInfoModules
),
%% Update the persistent term with the new info map
persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
ActionInfoMap.
action_info_modules() ->
ActionInfoModules = [
action_info_modules(App)
|| {App, _, _} <- application:loaded_applications()
],
lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()).
action_info_modules(App) ->
case application:get_env(App, emqx_action_info_module) of
{ok, Module} ->
[Module];
_ ->
[]
end.
initial_info_map() ->
#{
action_type_names => #{},
bridge_v1_type_to_action_type => #{},
action_type_to_bridge_v1_type => #{},
action_type_to_connector_type => #{},
action_type_to_schema_module => #{}
}.
get_info_map(Module) ->
%% Force the module to get loaded
_ = code:ensure_loaded(Module),
ActionType = Module:action_type_name(),
BridgeV1Type =
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
true ->
Module:bridge_v1_type_name();
false ->
Module:action_type_name()
end,
#{
action_type_names => #{
ActionType => true,
BridgeV1Type => true
},
bridge_v1_type_to_action_type => #{
BridgeV1Type => ActionType,
%% Alias the bridge V1 type to the action type
ActionType => ActionType
},
action_type_to_bridge_v1_type => #{
ActionType => BridgeV1Type
},
action_type_to_connector_type => #{
ActionType => Module:connector_type_name(),
%% Alias the bridge V1 type to the action type
BridgeV1Type => Module:connector_type_name()
},
action_type_to_schema_module => #{
ActionType => Module:schema_module()
}
}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.29"},
{vsn, "0.1.30"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -307,7 +307,7 @@ list() ->
emqx:get_raw_config([bridges], #{})
),
BridgeV2Bridges =
emqx_bridge_v2:list_and_transform_to_bridge_v1(),
emqx_bridge_v2:bridge_v1_list_and_transform(),
BridgeV1Bridges ++ BridgeV2Bridges.
%%BridgeV2Bridges = emqx_bridge_v2:list().
@ -318,7 +318,7 @@ lookup(Id) ->
lookup(Type, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
emqx_bridge_v2:lookup_and_transform_to_bridge_v1(Type, Name);
emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name);
false ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf)
@ -340,7 +340,7 @@ lookup(Type, Name, RawConf) ->
get_metrics(Type, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
emqx_bridge_v2:get_metrics(BridgeV2Type, Name);
@ -383,7 +383,7 @@ create(BridgeType0, BridgeName, RawConf) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:split_bridge_v1_config_and_create(BridgeType, BridgeName, RawConf);
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf);
false ->
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],

View File

@ -627,7 +627,7 @@ create_bridge(BridgeType, BridgeName, Conf) ->
update_bridge(BridgeType, BridgeName, Conf) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of
true ->
create_or_update_bridge(BridgeType, BridgeName, Conf, 200);
false ->

View File

@ -53,20 +53,20 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) ->
end.
%% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1.
upgrade_type(kafka) ->
kafka_producer;
upgrade_type(<<"kafka">>) ->
<<"kafka_producer">>;
upgrade_type(Other) ->
Other.
upgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type);
upgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type));
upgrade_type(Type) when is_list(Type) ->
atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))).
%% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
downgrade_type(kafka_producer) ->
kafka;
downgrade_type(<<"kafka_producer">>) ->
<<"kafka">>;
downgrade_type(Other) ->
Other.
downgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type);
downgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type));
downgrade_type(Type) when is_list(Type) ->
atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))).
%% A rule might be referencing an old version bridge type name
%% i.e. 'kafka' instead of 'kafka_producer' so we need to try both

View File

@ -130,7 +130,7 @@ reset_metrics(ResourceId) ->
false ->
emqx_resource:reset_metrics(ResourceId);
true ->
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
emqx_bridge_v2:reset_metrics(BridgeV2Type, Name);

View File

@ -40,6 +40,8 @@
list/0,
lookup/2,
create/3,
%% The remove/2 function is only for internal use as it may create
%% rules with broken dependencies
remove/2,
%% The following is the remove function that is called by the HTTP API
%% It also checks for rule action dependencies and optionally removes
@ -48,6 +50,7 @@
]).
%% Operations
-export([
disable_enable/3,
health_check/2,
@ -73,7 +76,8 @@
-export([
id/2,
id/3,
is_valid_bridge_v1/2
bridge_v1_is_valid/2,
extract_connector_id_from_bridge_v2_id/1
]).
%% Config Update Handler API
@ -88,18 +92,26 @@
import_config/1
]).
%% Compatibility API
%% Bridge V2 Types and Conversions
-export([
bridge_v2_type_to_connector_type/1,
is_bridge_v2_type/1,
lookup_and_transform_to_bridge_v1/2,
list_and_transform_to_bridge_v1/0,
is_bridge_v2_type/1
]).
%% Compatibility Layer API
%% All public functions for the compatibility layer should be prefixed with
%% bridge_v1_
-export([
bridge_v1_lookup_and_transform/2,
bridge_v1_list_and_transform/0,
bridge_v1_check_deps_and_remove/3,
split_bridge_v1_config_and_create/3,
bridge_v1_split_config_and_create/3,
bridge_v1_create_dry_run/2,
extract_connector_id_from_bridge_v2_id/1,
bridge_v1_type_to_bridge_v2_type/1,
%% Exception from the naming convention:
bridge_v2_type_to_bridge_v1_type/1,
bridge_v1_id_to_connector_resource_id/1,
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
@ -107,6 +119,27 @@
bridge_v1_start/2
]).
%%====================================================================
%% Types
%%====================================================================
-type bridge_v2_info() :: #{
type := binary(),
name := binary(),
raw_config := map(),
resource_data := map(),
status := emqx_resource:resource_status(),
%% Explanation of the status if the status is not connected
error := term()
}.
-type bridge_v2_type() :: binary() | atom() | [byte()].
-type bridge_v2_name() :: binary() | atom() | [byte()].
%%====================================================================
%%====================================================================
%%====================================================================
%% Loading and unloading config when EMQX starts and stops
%%====================================================================
@ -157,6 +190,7 @@ unload_bridges() ->
%% CRUD API
%%====================================================================
-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}.
lookup(Type, Name) ->
case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of
not_found ->
@ -191,8 +225,8 @@ lookup(Type, Name) ->
{disconnected, <<"Pending installation">>}
end,
{ok, #{
type => Type,
name => Name,
type => bin(Type),
name => bin(Name),
raw_config => RawConf,
resource_data => InstanceData,
status => DisplayBridgeV2Status,
@ -200,9 +234,12 @@ lookup(Type, Name) ->
}}
end.
-spec list() -> [bridge_v2_info()] | {error, term()}.
list() ->
list_with_lookup_fun(fun lookup/2).
-spec create(bridge_v2_type(), bridge_v2_name(), map()) ->
{ok, emqx_config:update_result()} | {error, any()}.
create(BridgeType, BridgeName, RawConf) ->
?SLOG(debug, #{
brige_action => create,
@ -217,9 +254,10 @@ create(BridgeType, BridgeName, RawConf) ->
#{override_to => cluster}
).
%% NOTE: This function can cause broken references but it is only called from
%% test cases.
-spec remove(atom() | binary(), binary()) -> ok | {error, any()}.
%% NOTE: This function can cause broken references from rules but it is only
%% called directly from test cases.
-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}.
remove(BridgeType, BridgeName) ->
?SLOG(debug, #{
brige_action => remove,
@ -237,6 +275,7 @@ remove(BridgeType, BridgeName) ->
{error, Reason} -> {error, Reason}
end.
-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}.
check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
AlsoDelete =
case AlsoDeleteActions of
@ -408,6 +447,8 @@ combine_connector_and_bridge_v2_config(
%% Operations
%%====================================================================
-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
{ok, any()} | {error, any()}.
disable_enable(Action, BridgeType, BridgeName) when
Action =:= disable; Action =:= enable
->
@ -485,6 +526,7 @@ connector_operation_helper_with_conf(
end
end.
-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}.
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
@ -492,7 +534,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) ->
ok;
reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id).
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id);
reset_metrics_helper(_, _, _) ->
{error, not_found}.
get_query_mode(BridgeV2Type, Config) ->
CreationOpts = emqx_resource:fetch_creation_opts(Config),
@ -500,6 +544,8 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
@ -533,8 +579,7 @@ do_send_msg_with_enabled_config(
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := term(), error := term()} | {error, Reason :: term()}.
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
health_check(BridgeType, BridgeName) ->
case lookup_conf(BridgeType, BridgeName) of
#{
@ -553,6 +598,34 @@ health_check(BridgeType, BridgeName) ->
Error
end.
-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
TypeBin = bin(Type),
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
%% Check config
try
_ =
hocon_tconf:check_plain(
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
),
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end.
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = connector_type(BridgeType),
@ -584,33 +657,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
end,
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
TypeBin = bin(Type),
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
%% Check config
try
_ =
hocon_tconf:check_plain(
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
),
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end.
-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics().
get_metrics(Type, Name) ->
emqx_resource:get_metrics(id(Type, Name)).
@ -779,15 +826,8 @@ connector_type(Type) ->
%% remote call so it can be mocked
?MODULE:bridge_v2_type_to_connector_type(Type).
bridge_v2_type_to_connector_type(Type) when not is_atom(Type) ->
bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
bridge_v2_type_to_connector_type(kafka) ->
%% backward compatible
kafka_producer;
bridge_v2_type_to_connector_type(kafka_producer) ->
kafka_producer;
bridge_v2_type_to_connector_type(azure_event_hub_producer) ->
azure_event_hub_producer.
bridge_v2_type_to_connector_type(Type) ->
emqx_action_info:action_type_to_connector_type(Type).
%%====================================================================
%% Data backup API
@ -989,7 +1029,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
%%
%% * The corresponding bridge v2 should exist
%% * The connector for the bridge v2 should have exactly one channel
is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case lookup_conf(BridgeV2Type, BridgeName) of
{error, _} ->
@ -1007,35 +1047,21 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
end
end.
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_bridge_v2_type(kafka) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(kafka_producer) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) ->
azure_event_hub_producer.
bridge_v1_type_to_bridge_v2_type(Type) ->
emqx_action_info:bridge_v1_type_to_action_type(Type).
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_bridge_v2_type(Atom) when is_atom(Atom) ->
is_bridge_v2_type(atom_to_binary(Atom, utf8));
is_bridge_v2_type(<<"kafka_producer">>) ->
true;
is_bridge_v2_type(<<"kafka">>) ->
true;
is_bridge_v2_type(<<"azure_event_hub_producer">>) ->
true;
is_bridge_v2_type(_) ->
false.
bridge_v2_type_to_bridge_v1_type(Type) ->
emqx_action_info:action_type_to_bridge_v1_type(Type).
list_and_transform_to_bridge_v1() ->
Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2),
is_bridge_v2_type(Type) ->
emqx_action_info:is_action_type(Type).
bridge_v1_list_and_transform() ->
Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2),
[B || B <- Bridges, B =/= not_bridge_v1_compatible_error()].
lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of
bridge_v1_lookup_and_transform(BridgeV1Type, Name) ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case lookup(Type, Name) of
@ -1043,7 +1069,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
ConnectorType = connector_type(Type),
case emqx_connector:lookup(ConnectorType, ConnectorName) of
{ok, Connector} ->
lookup_and_transform_to_bridge_v1_helper(
bridge_v1_lookup_and_transform_helper(
BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector
);
Error ->
@ -1059,7 +1085,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
not_bridge_v1_compatible_error() ->
{error, not_bridge_v1_compatible}.
lookup_and_transform_to_bridge_v1_helper(
bridge_v1_lookup_and_transform_helper(
BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector
) ->
ConnectorRawConfig1 = maps:get(raw_config, Connector),
@ -1112,7 +1138,7 @@ lookup_conf(Type, Name) ->
Config
end.
split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
%% Check if the bridge v2 exists
case lookup_conf(BridgeV2Type, BridgeName) of
@ -1123,7 +1149,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
BridgeV1Type, BridgeName, RawConf, PreviousRawConf
);
_Conf ->
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
true ->
%% Using remove + create as update, hence do not delete deps.
RemoveDeps = [],
@ -1358,7 +1384,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) ->
end.
bridge_v1_enable_disable(Action, BridgeType, BridgeName) ->
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of
true ->
bridge_v1_enable_disable_helper(
Action,
@ -1403,7 +1429,7 @@ bridge_v1_start(BridgeV1Type, Name) ->
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
connector_operation_helper_with_conf(
BridgeV2Type,

View File

@ -98,21 +98,11 @@ get_response_body_schema() ->
).
bridge_info_examples(Method) ->
maps:merge(
#{},
emqx_enterprise_bridge_examples(Method)
).
emqx_bridge_v2_schema:examples(Method).
bridge_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_bridge_examples(Method) ->
emqx_bridge_v2_enterprise:examples(Method).
-else.
emqx_enterprise_bridge_examples(_Method) -> #{}.
-endif.
param_path_id() ->
{id,
mk(

View File

@ -82,6 +82,11 @@ schema_modules() ->
].
examples(Method) ->
ActionExamples = emqx_bridge_v2_schema:examples(Method),
RegisteredExamples = registered_examples(Method),
maps:merge(ActionExamples, RegisteredExamples).
registered_examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)

View File

@ -1,68 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_enterprise).
-if(?EMQX_RELEASE_EDITION == ee).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
api_schemas/1,
examples/1,
fields/1
]).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_bridge_kafka,
emqx_bridge_azure_event_hub
].
fields(actions) ->
action_structs().
action_structs() ->
[
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Actions Config">>,
required => false
}
)},
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)),
#{
desc => <<"Azure Event Hub Actions Config">>,
required => false
}
)}
].
api_schemas(Method) ->
[
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
-else.
-endif.

View File

@ -27,51 +27,24 @@
-export([
get_response/0,
put_request/0,
post_request/0
post_request/0,
examples/1
]).
%% Exported for mocking
%% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to
%% export this
-export([
registered_api_schemas/1
]).
-export([types/0, types_sc/0]).
-export([enterprise_api_schemas/1]).
-export_type([action_type/0]).
%% Should we explicitly list them here so dialyzer may be more helpful?
-type action_type() :: atom().
-if(?EMQX_RELEASE_EDITION == ee).
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
Method :: string().
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_v2_enterprise, api_schemas, 1) of
true -> emqx_bridge_v2_enterprise:api_schemas(Method);
false -> []
end.
enterprise_fields_actions() ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_v2_enterprise, fields, 1) of
true ->
emqx_bridge_v2_enterprise:fields(actions);
false ->
[]
end.
-else.
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
Method :: string().
enterprise_api_schemas(_Method) -> [].
enterprise_fields_actions() -> [].
-endif.
%%======================================================================================
%% For HTTP APIs
get_response() ->
@ -84,8 +57,18 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
EE = ?MODULE:enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(EE)).
APISchemas = ?MODULE:registered_api_schemas(Method),
hoconsc:union(bridge_api_union(APISchemas)).
registered_api_schemas(Method) ->
RegisteredSchemas = emqx_action_info:registered_schema_modules(),
[
api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
|| {BridgeV2Type, SchemaModule} <- RegisteredSchemas
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
@ -133,7 +116,13 @@ roots() ->
end.
fields(actions) ->
[] ++ enterprise_fields_actions().
registered_schema_fields().
registered_schema_fields() ->
[
Module:fields(action)
|| {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules()
].
desc(actions) ->
?DESC("desc_bridges_v2");
@ -148,6 +137,19 @@ types() ->
types_sc() ->
hoconsc:enum(types()).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
lists:foldl(Fun, #{}, SchemaModules).
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->

View File

@ -111,7 +111,7 @@ setup_mocks() ->
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
meck:expect(
emqx_bridge_v2_schema,
enterprise_api_schemas,
registered_api_schemas,
1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
),

View File

@ -264,17 +264,17 @@ t_create_dry_run_connector_does_not_exist(_) ->
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
t_is_valid_bridge_v1(_) ->
t_bridge_v1_is_valid(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
%% Add another channel/bridge to the connector
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()),
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
false = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge_2),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
%% Non existing bridge is a valid Bridge V1
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
ok.
t_manual_health_check(_) ->
@ -647,10 +647,12 @@ t_load_config_success(_Config) ->
{ok, _},
update_root_config(RootConf0)
),
BridgeTypeBin = bin(BridgeType),
BridgeNameBin = bin(BridgeName),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
type := BridgeTypeBin,
name := BridgeNameBin,
raw_config := #{},
resource_data := #{}
}},
@ -665,8 +667,8 @@ t_load_config_success(_Config) ->
),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
type := BridgeTypeBin,
name := BridgeNameBin,
raw_config := #{<<"some_key">> := <<"new_value">>},
resource_data := #{}
}},
@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 ->
end;
wait_until(_, _) ->
ct:fail("Wait until event did not happen").
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.3"},
{vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,

View File

@ -114,6 +114,15 @@ fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
fields(action) ->
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)),
#{
desc => <<"Azure Event Hub Actions Config">>,
required => false
}
)};
fields(actions) ->
Fields =
override(

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_event_hub_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> azure_event_hub_producer.
action_type_name() -> azure_event_hub_producer.
connector_type_name() -> azure_event_hub_producer.
schema_module() -> emqx_bridge_azure_event_hub.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.11"},
{vsn, "0.1.12"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,
@ -12,7 +12,7 @@
brod,
brod_gssapi
]},
{env, []},
{env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]},
{modules, []},
{links, []}

View File

@ -526,7 +526,18 @@ fields(consumer_kafka_opts) ->
fields(resource_opts) ->
SupportedFields = [health_check_interval],
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
fields(action_field) ->
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Action Config">>,
required => false
}
)};
fields(action) ->
fields(action_field).
desc("config_connector") ->
?DESC("desc_config");

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> kafka.
action_type_name() -> kafka_producer.
connector_type_name() -> kafka_producer.
schema_module() -> emqx_bridge_kafka.

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.30"},
{vsn, "0.1.31"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) ->
%% @doc Called from build script.
%% TODO: move to a external escript after all refactoring is done
dump_schema(Dir, SchemaModule) ->
%% TODO: Load all apps instead of only emqx_dashboard
%% as this will help schemas that searches for apps with
%% relevant schema definitions
_ = application:load(emqx_dashboard),
ok = emqx_dashboard_desc_cache:init(),
lists:foreach(

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.24"},
{vsn, "0.1.25"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [

View File

@ -447,7 +447,7 @@ health_check(ResId) ->
emqx_resource_manager:health_check(ResId).
-spec channel_health_check(resource_id(), channel_id()) ->
#{status := channel_status(), error := term(), any() => any()}.
#{status := resource_status(), error := term()}.
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).

View File

@ -309,7 +309,7 @@ health_check(ResId) ->
safe_call(ResId, health_check, ?T_OPERATION).
-spec channel_health_check(resource_id(), channel_id()) ->
#{status := channel_status(), error := term(), any() => any()}.
#{status := resource_status(), error := term()}.
channel_health_check(ResId, ChannelId) ->
%% Do normal health check first to trigger health checks for channels
%% and update the cached health status for the channels

View File

@ -311,6 +311,15 @@ t_rule_engine(_) ->
{400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
t_downgrade_bridge_type(_) ->
case emqx_release:edition() of
ee ->
do_test_downgrade_bridge_type();
ce ->
%% downgrade is not supported in CE
ok
end.
do_test_downgrade_bridge_type() ->
#{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
?assertMatch(
%% returns a bridges_v2 ID