diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl new file mode 100644 index 000000000..8e8d51aff --- /dev/null +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -0,0 +1,199 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The module which knows everything about actions. + +%% NOTE: it does not cover the V1 bridges. + +-module(emqx_action_info). + +-export([ + action_type_to_connector_type/1, + action_type_to_bridge_v1_type/1, + bridge_v1_type_to_action_type/1, + is_action_type/1, + registered_schema_modules/0 +]). + +-callback bridge_v1_type_name() -> atom(). +-callback action_type_name() -> atom(). +-callback connector_type_name() -> atom(). +-callback schema_module() -> atom(). + +-optional_callbacks([bridge_v1_type_name/0]). + +%% ==================================================================== +%% Hadcoded list of info modules for actions +%% TODO: Remove this list once we have made sure that all relevants +%% apps are loaded before this module is called. +%% ==================================================================== + +-if(?EMQX_RELEASE_EDITION == ee). +hard_coded_action_info_modules_ee() -> + [ + emqx_bridge_kafka_action_info, + emqx_bridge_azure_event_hub_action_info + ]. +-else. +hard_coded_action_info_modules_ee() -> + []. +-endif. + +hard_coded_action_info_modules_common() -> + []. + +hard_coded_action_info_modules() -> + hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). + +%% ==================================================================== +%% API +%% ==================================================================== + +action_type_to_connector_type(Type) when not is_atom(Type) -> + action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); +action_type_to_connector_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap), + case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of + undefined -> Type; + ConnectorType -> ConnectorType + end. + +bridge_v1_type_to_action_type(Bin) when is_binary(Bin) -> + bridge_v1_type_to_action_type(binary_to_existing_atom(Bin)); +bridge_v1_type_to_action_type(Type) -> + ActionInfoMap = info_map(), + BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap), + case maps:get(Type, BridgeV1TypeToActionType, undefined) of + undefined -> Type; + ActionType -> ActionType + end. + +action_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> + action_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); +action_type_to_bridge_v1_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), + case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of + undefined -> Type; + BridgeV1Type -> BridgeV1Type + end. + +%% This function should return true for all inputs that are bridge V1 types for +%% bridges that have been refactored to bridge V2s, and for all all bridge V2 +%% types. For everything else the function should return false. +is_action_type(Bin) when is_binary(Bin) -> + is_action_type(binary_to_existing_atom(Bin)); +is_action_type(Type) -> + ActionInfoMap = info_map(), + ActionTypes = maps:get(action_type_names, ActionInfoMap), + case maps:get(Type, ActionTypes, undefined) of + undefined -> false; + _ -> true + end. + +registered_schema_modules() -> + InfoMap = info_map(), + Schemas = maps:get(action_type_to_schema_module, InfoMap), + maps:to_list(Schemas). + +%% ==================================================================== +%% Internal functions for building the info map and accessing it +%% ==================================================================== + +internal_emqx_action_persistent_term_info_key() -> + ?FUNCTION_NAME. + +info_map() -> + case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of + not_found -> + build_cache(); + ActionInfoMap -> + ActionInfoMap + end. + +build_cache() -> + ActionInfoModules = action_info_modules(), + ActionInfoMap = + lists:foldl( + fun(Module, InfoMapSoFar) -> + ModuleInfoMap = get_info_map(Module), + emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap) + end, + initial_info_map(), + ActionInfoModules + ), + %% Update the persistent term with the new info map + persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap), + ActionInfoMap. + +action_info_modules() -> + ActionInfoModules = [ + action_info_modules(App) + || {App, _, _} <- application:loaded_applications() + ], + lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). + +action_info_modules(App) -> + case application:get_env(App, emqx_action_info_module) of + {ok, Module} -> + [Module]; + _ -> + [] + end. + +initial_info_map() -> + #{ + action_type_names => #{}, + bridge_v1_type_to_action_type => #{}, + action_type_to_bridge_v1_type => #{}, + action_type_to_connector_type => #{}, + action_type_to_schema_module => #{} + }. + +get_info_map(Module) -> + %% Force the module to get loaded + _ = code:ensure_loaded(Module), + ActionType = Module:action_type_name(), + BridgeV1Type = + case erlang:function_exported(Module, bridge_v1_type_name, 0) of + true -> + Module:bridge_v1_type_name(); + false -> + Module:action_type_name() + end, + #{ + action_type_names => #{ + ActionType => true, + BridgeV1Type => true + }, + bridge_v1_type_to_action_type => #{ + BridgeV1Type => ActionType, + %% Alias the bridge V1 type to the action type + ActionType => ActionType + }, + action_type_to_bridge_v1_type => #{ + ActionType => BridgeV1Type + }, + action_type_to_connector_type => #{ + ActionType => Module:connector_type_name(), + %% Alias the bridge V1 type to the action type + BridgeV1Type => Module:connector_type_name() + }, + action_type_to_schema_module => #{ + ActionType => Module:schema_module() + } + }. diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index c2387fe99..f829b12df 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.29"}, + {vsn, "0.1.30"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 8098072c0..51bdfb084 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -307,7 +307,7 @@ list() -> emqx:get_raw_config([bridges], #{}) ), BridgeV2Bridges = - emqx_bridge_v2:list_and_transform_to_bridge_v1(), + emqx_bridge_v2:bridge_v1_list_and_transform(), BridgeV1Bridges ++ BridgeV2Bridges. %%BridgeV2Bridges = emqx_bridge_v2:list(). @@ -318,7 +318,7 @@ lookup(Id) -> lookup(Type, Name) -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - emqx_bridge_v2:lookup_and_transform_to_bridge_v1(Type, Name); + emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name); false -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf) @@ -340,7 +340,7 @@ lookup(Type, Name, RawConf) -> get_metrics(Type, Name) -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of true -> BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), emqx_bridge_v2:get_metrics(BridgeV2Type, Name); @@ -383,7 +383,7 @@ create(BridgeType0, BridgeName, RawConf) -> }), case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:split_bridge_v1_config_and_create(BridgeType, BridgeName, RawConf); + emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf); false -> emqx_conf:update( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 0b6f4c187..d263817bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -627,7 +627,7 @@ create_bridge(BridgeType, BridgeName, Conf) -> update_bridge(BridgeType, BridgeName, Conf) -> case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of true -> create_or_update_bridge(BridgeType, BridgeName, Conf, 200); false -> diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index b11344ee1..4be605745 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -53,20 +53,20 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) -> end. %% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1. -upgrade_type(kafka) -> - kafka_producer; -upgrade_type(<<"kafka">>) -> - <<"kafka_producer">>; -upgrade_type(Other) -> - Other. +upgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type); +upgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)); +upgrade_type(Type) when is_list(Type) -> + atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))). %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1 -downgrade_type(kafka_producer) -> - kafka; -downgrade_type(<<"kafka_producer">>) -> - <<"kafka">>; -downgrade_type(Other) -> - Other. +downgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type); +downgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)); +downgrade_type(Type) when is_list(Type) -> + atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))). %% A rule might be referencing an old version bridge type name %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index c7646faf4..674eceb81 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -130,7 +130,7 @@ reset_metrics(ResourceId) -> false -> emqx_resource:reset_metrics(ResourceId); true -> - case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of true -> BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), emqx_bridge_v2:reset_metrics(BridgeV2Type, Name); diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index d8646f952..70e248e56 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -40,6 +40,8 @@ list/0, lookup/2, create/3, + %% The remove/2 function is only for internal use as it may create + %% rules with broken dependencies remove/2, %% The following is the remove function that is called by the HTTP API %% It also checks for rule action dependencies and optionally removes @@ -48,6 +50,7 @@ ]). %% Operations + -export([ disable_enable/3, health_check/2, @@ -73,7 +76,8 @@ -export([ id/2, id/3, - is_valid_bridge_v1/2 + bridge_v1_is_valid/2, + extract_connector_id_from_bridge_v2_id/1 ]). %% Config Update Handler API @@ -88,18 +92,26 @@ import_config/1 ]). -%% Compatibility API +%% Bridge V2 Types and Conversions -export([ bridge_v2_type_to_connector_type/1, - is_bridge_v2_type/1, - lookup_and_transform_to_bridge_v1/2, - list_and_transform_to_bridge_v1/0, + is_bridge_v2_type/1 +]). + +%% Compatibility Layer API +%% All public functions for the compatibility layer should be prefixed with +%% bridge_v1_ + +-export([ + bridge_v1_lookup_and_transform/2, + bridge_v1_list_and_transform/0, bridge_v1_check_deps_and_remove/3, - split_bridge_v1_config_and_create/3, + bridge_v1_split_config_and_create/3, bridge_v1_create_dry_run/2, - extract_connector_id_from_bridge_v2_id/1, bridge_v1_type_to_bridge_v2_type/1, + %% Exception from the naming convention: + bridge_v2_type_to_bridge_v1_type/1, bridge_v1_id_to_connector_resource_id/1, bridge_v1_enable_disable/3, bridge_v1_restart/2, @@ -107,6 +119,27 @@ bridge_v1_start/2 ]). +%%==================================================================== +%% Types +%%==================================================================== + +-type bridge_v2_info() :: #{ + type := binary(), + name := binary(), + raw_config := map(), + resource_data := map(), + status := emqx_resource:resource_status(), + %% Explanation of the status if the status is not connected + error := term() +}. + +-type bridge_v2_type() :: binary() | atom() | [byte()]. +-type bridge_v2_name() :: binary() | atom() | [byte()]. + +%%==================================================================== + +%%==================================================================== + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -157,6 +190,7 @@ unload_bridges() -> %% CRUD API %%==================================================================== +-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(Type, Name) -> case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of not_found -> @@ -191,8 +225,8 @@ lookup(Type, Name) -> {disconnected, <<"Pending installation">>} end, {ok, #{ - type => Type, - name => Name, + type => bin(Type), + name => bin(Name), raw_config => RawConf, resource_data => InstanceData, status => DisplayBridgeV2Status, @@ -200,9 +234,12 @@ lookup(Type, Name) -> }} end. +-spec list() -> [bridge_v2_info()] | {error, term()}. list() -> list_with_lookup_fun(fun lookup/2). +-spec create(bridge_v2_type(), bridge_v2_name(), map()) -> + {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ brige_action => create, @@ -217,9 +254,10 @@ create(BridgeType, BridgeName, RawConf) -> #{override_to => cluster} ). -%% NOTE: This function can cause broken references but it is only called from -%% test cases. --spec remove(atom() | binary(), binary()) -> ok | {error, any()}. +%% NOTE: This function can cause broken references from rules but it is only +%% called directly from test cases. + +-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}. remove(BridgeType, BridgeName) -> ?SLOG(debug, #{ brige_action => remove, @@ -237,6 +275,7 @@ remove(BridgeType, BridgeName) -> {error, Reason} -> {error, Reason} end. +-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}. check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> AlsoDelete = case AlsoDeleteActions of @@ -408,6 +447,8 @@ combine_connector_and_bridge_v2_config( %% Operations %%==================================================================== +-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> + {ok, any()} | {error, any()}. disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable -> @@ -485,6 +526,7 @@ connector_operation_helper_with_conf( end end. +-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -492,7 +534,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) -> ok; reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id). + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id); +reset_metrics_helper(_, _, _) -> + {error, not_found}. get_query_mode(BridgeV2Type, Config) -> CreationOpts = emqx_resource:fetch_creation_opts(Config), @@ -500,6 +544,8 @@ get_query_mode(BridgeV2Type, Config) -> ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). +-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> + term() | {error, term()}. send_message(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config0 -> @@ -533,8 +579,7 @@ do_send_msg_with_enabled_config( emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). -spec health_check(BridgeType :: term(), BridgeName :: term()) -> - #{status := term(), error := term()} | {error, Reason :: term()}. - + #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -553,6 +598,34 @@ health_check(BridgeType, BridgeName) -> Error end. +-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}. +create_dry_run(Type, Conf0) -> + Conf1 = maps:without([<<"name">>], Conf0), + TypeBin = bin(Type), + RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + %% Check config + try + _ = + hocon_tconf:check_plain( + emqx_bridge_v2_schema, + RawConf, + #{atom_key => true, required => false} + ), + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end + catch + %% validation errors + throw:Reason1 -> + {error, Reason1} + end. + create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), ConnectorType = connector_type(BridgeType), @@ -584,33 +657,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> end, emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). -create_dry_run(Type, Conf0) -> - Conf1 = maps:without([<<"name">>], Conf0), - TypeBin = bin(Type), - RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, - %% Check config - try - _ = - hocon_tconf:check_plain( - emqx_bridge_v2_schema, - RawConf, - #{atom_key => true, required => false} - ), - #{<<"connector">> := ConnectorName} = Conf1, - %% Check that the connector exists and do the dry run if it exists - ConnectorType = connector_type(Type), - case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of - not_found -> - {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; - ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) - end - catch - %% validation errors - throw:Reason1 -> - {error, Reason1} - end. - +-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics(). get_metrics(Type, Name) -> emqx_resource:get_metrics(id(Type, Name)). @@ -779,15 +826,8 @@ connector_type(Type) -> %% remote call so it can be mocked ?MODULE:bridge_v2_type_to_connector_type(Type). -bridge_v2_type_to_connector_type(Type) when not is_atom(Type) -> - bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); -bridge_v2_type_to_connector_type(kafka) -> - %% backward compatible - kafka_producer; -bridge_v2_type_to_connector_type(kafka_producer) -> - kafka_producer; -bridge_v2_type_to_connector_type(azure_event_hub_producer) -> - azure_event_hub_producer. +bridge_v2_type_to_connector_type(Type) -> + emqx_action_info:action_type_to_connector_type(Type). %%==================================================================== %% Data backup API @@ -989,7 +1029,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% %% * The corresponding bridge v2 should exist %% * The connector for the bridge v2 should have exactly one channel -is_valid_bridge_v1(BridgeV1Type, BridgeName) -> +bridge_v1_is_valid(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup_conf(BridgeV2Type, BridgeName) of {error, _} -> @@ -1007,35 +1047,21 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) -> end end. -bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> - ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); -bridge_v1_type_to_bridge_v2_type(kafka) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(kafka_producer) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> - azure_event_hub_producer. +bridge_v1_type_to_bridge_v2_type(Type) -> + emqx_action_info:bridge_v1_type_to_action_type(Type). -%% This function should return true for all inputs that are bridge V1 types for -%% bridges that have been refactored to bridge V2s, and for all all bridge V2 -%% types. For everything else the function should return false. -is_bridge_v2_type(Atom) when is_atom(Atom) -> - is_bridge_v2_type(atom_to_binary(Atom, utf8)); -is_bridge_v2_type(<<"kafka_producer">>) -> - true; -is_bridge_v2_type(<<"kafka">>) -> - true; -is_bridge_v2_type(<<"azure_event_hub_producer">>) -> - true; -is_bridge_v2_type(_) -> - false. +bridge_v2_type_to_bridge_v1_type(Type) -> + emqx_action_info:action_type_to_bridge_v1_type(Type). -list_and_transform_to_bridge_v1() -> - Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2), +is_bridge_v2_type(Type) -> + emqx_action_info:is_action_type(Type). + +bridge_v1_list_and_transform() -> + Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2), [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. -lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> - case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of +bridge_v1_lookup_and_transform(BridgeV1Type, Name) -> + case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of true -> Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup(Type, Name) of @@ -1043,7 +1069,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> ConnectorType = connector_type(Type), case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, Connector} -> - lookup_and_transform_to_bridge_v1_helper( + bridge_v1_lookup_and_transform_helper( BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector ); Error -> @@ -1059,7 +1085,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> not_bridge_v1_compatible_error() -> {error, not_bridge_v1_compatible}. -lookup_and_transform_to_bridge_v1_helper( +bridge_v1_lookup_and_transform_helper( BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector ) -> ConnectorRawConfig1 = maps:get(raw_config, Connector), @@ -1112,7 +1138,7 @@ lookup_conf(Type, Name) -> Config end. -split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> +bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), %% Check if the bridge v2 exists case lookup_conf(BridgeV2Type, BridgeName) of @@ -1123,7 +1149,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV1Type, BridgeName, RawConf, PreviousRawConf ); _Conf -> - case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of + case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], @@ -1358,7 +1384,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) -> end. bridge_v1_enable_disable(Action, BridgeType, BridgeName) -> - case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of true -> bridge_v1_enable_disable_helper( Action, @@ -1403,7 +1429,7 @@ bridge_v1_start(BridgeV1Type, Name) -> bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of true -> connector_operation_helper_with_conf( BridgeV2Type, diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 634337d99..d5fd09631 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -98,21 +98,11 @@ get_response_body_schema() -> ). bridge_info_examples(Method) -> - maps:merge( - #{}, - emqx_enterprise_bridge_examples(Method) - ). + emqx_bridge_v2_schema:examples(Method). bridge_info_array_example(Method) -> lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))). --if(?EMQX_RELEASE_EDITION == ee). -emqx_enterprise_bridge_examples(Method) -> - emqx_bridge_v2_enterprise:examples(Method). --else. -emqx_enterprise_bridge_examples(_Method) -> #{}. --endif. - param_path_id() -> {id, mk( diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 5cbc709a5..9456575d4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -82,6 +82,11 @@ schema_modules() -> ]. examples(Method) -> + ActionExamples = emqx_bridge_v2_schema:examples(Method), + RegisteredExamples = registered_examples(Method), + maps:merge(ActionExamples, RegisteredExamples). + +registered_examples(Method) -> MergeFun = fun(Example, Examples) -> maps:merge(Examples, Example) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl deleted file mode 100644 index 54448f07d..000000000 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ /dev/null @@ -1,68 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_bridge_v2_enterprise). - --if(?EMQX_RELEASE_EDITION == ee). - --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - api_schemas/1, - examples/1, - fields/1 -]). - -examples(Method) -> - MergeFun = - fun(Example, Examples) -> - maps:merge(Examples, Example) - end, - Fun = - fun(Module, Examples) -> - ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), - lists:foldl(MergeFun, Examples, ConnectorExamples) - end, - lists:foldl(Fun, #{}, schema_modules()). - -schema_modules() -> - [ - emqx_bridge_kafka, - emqx_bridge_azure_event_hub - ]. - -fields(actions) -> - action_structs(). - -action_structs() -> - [ - {kafka_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), - #{ - desc => <<"Kafka Producer Actions Config">>, - required => false - } - )}, - {azure_event_hub_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)), - #{ - desc => <<"Azure Event Hub Actions Config">>, - required => false - } - )} - ]. - -api_schemas(Method) -> - [ - api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") - ]. - -api_ref(Module, Type, Method) -> - {Type, ref(Module, Method)}. - --else. - --endif. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 1d059903a..ede783e97 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -27,51 +27,24 @@ -export([ get_response/0, put_request/0, - post_request/0 + post_request/0, + examples/1 +]). + +%% Exported for mocking +%% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to +%% export this +-export([ + registered_api_schemas/1 ]). -export([types/0, types_sc/0]). --export([enterprise_api_schemas/1]). - -export_type([action_type/0]). %% Should we explicitly list them here so dialyzer may be more helpful? -type action_type() :: atom(). --if(?EMQX_RELEASE_EDITION == ee). --spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when - Method :: string(). -enterprise_api_schemas(Method) -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2_enterprise:module_info(), - case erlang:function_exported(emqx_bridge_v2_enterprise, api_schemas, 1) of - true -> emqx_bridge_v2_enterprise:api_schemas(Method); - false -> [] - end. - -enterprise_fields_actions() -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2_enterprise:module_info(), - case erlang:function_exported(emqx_bridge_v2_enterprise, fields, 1) of - true -> - emqx_bridge_v2_enterprise:fields(actions); - false -> - [] - end. - --else. - --spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when - Method :: string(). -enterprise_api_schemas(_Method) -> []. - -enterprise_fields_actions() -> []. - --endif. - %%====================================================================================== %% For HTTP APIs get_response() -> @@ -84,8 +57,18 @@ post_request() -> api_schema("post"). api_schema(Method) -> - EE = ?MODULE:enterprise_api_schemas(Method), - hoconsc:union(bridge_api_union(EE)). + APISchemas = ?MODULE:registered_api_schemas(Method), + hoconsc:union(bridge_api_union(APISchemas)). + +registered_api_schemas(Method) -> + RegisteredSchemas = emqx_action_info:registered_schema_modules(), + [ + api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") + || {BridgeV2Type, SchemaModule} <- RegisteredSchemas + ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}. bridge_api_union(Refs) -> Index = maps:from_list(Refs), @@ -133,7 +116,13 @@ roots() -> end. fields(actions) -> - [] ++ enterprise_fields_actions(). + registered_schema_fields(). + +registered_schema_fields() -> + [ + Module:fields(action) + || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules() + ]. desc(actions) -> ?DESC("desc_bridges_v2"); @@ -148,6 +137,19 @@ types() -> types_sc() -> hoconsc:enum(types()). +examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()], + lists:foldl(Fun, #{}, SchemaModules). + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index 8227e7993..f3b7fb685 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -111,7 +111,7 @@ setup_mocks() -> catch meck:new(emqx_bridge_v2_schema, MeckOpts), meck:expect( emqx_bridge_v2_schema, - enterprise_api_schemas, + registered_api_schemas, 1, fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end ), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 367e95784..2766088a1 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -264,17 +264,17 @@ t_create_dry_run_connector_does_not_exist(_) -> BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>}, {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf). -t_is_valid_bridge_v1(_) -> +t_bridge_v1_is_valid(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), %% Add another channel/bridge to the connector {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()), - false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + false = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge_2), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2), %% Non existing bridge is a valid Bridge V1 - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), ok. t_manual_health_check(_) -> @@ -647,10 +647,12 @@ t_load_config_success(_Config) -> {ok, _}, update_root_config(RootConf0) ), + BridgeTypeBin = bin(BridgeType), + BridgeNameBin = bin(BridgeName), ?assertMatch( {ok, #{ - type := BridgeType, - name := BridgeName, + type := BridgeTypeBin, + name := BridgeNameBin, raw_config := #{}, resource_data := #{} }}, @@ -665,8 +667,8 @@ t_load_config_success(_Config) -> ), ?assertMatch( {ok, #{ - type := BridgeType, - name := BridgeName, + type := BridgeTypeBin, + name := BridgeNameBin, raw_config := #{<<"some_key">> := <<"new_value">>}, resource_data := #{} }}, @@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 -> end; wait_until(_, _) -> ct:fail("Wait until event did not happen"). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index ece0495f9..40ea79334 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 6ae550a7f..eb364bdff 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -114,6 +114,15 @@ fields(kafka_message) -> Fields0 = emqx_bridge_kafka:fields(kafka_message), Fields = proplists:delete(timestamp, Fields0), override_documentations(Fields); +fields(action) -> + {azure_event_hub_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)), + #{ + desc => <<"Azure Event Hub Actions Config">>, + required => false + } + )}; fields(actions) -> Fields = override( diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl new file mode 100644 index 000000000..8ebdb2435 --- /dev/null +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_azure_event_hub_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> azure_event_hub_producer. + +action_type_name() -> azure_event_hub_producer. + +connector_type_name() -> azure_event_hub_producer. + +schema_module() -> emqx_bridge_azure_event_hub. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 88fa6b7bd..00b9d8968 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, @@ -12,7 +12,7 @@ brod, brod_gssapi ]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 55ac36d1a..0eb015cd3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -526,7 +526,18 @@ fields(consumer_kafka_opts) -> fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), - lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts). + lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); +fields(action_field) -> + {kafka_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), + #{ + desc => <<"Kafka Producer Action Config">>, + required => false + } + )}; +fields(action) -> + fields(action_field). desc("config_connector") -> ?DESC("desc_config"); diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl new file mode 100644 index 000000000..50d4f0c63 --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_kafka_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> kafka. + +action_type_name() -> kafka_producer. + +connector_type_name() -> kafka_producer. + +schema_module() -> emqx_bridge_kafka. diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index fda3e4759..3856a882c 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.30"}, + {vsn, "0.1.31"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 78a39f5dd..4e00c1c57 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) -> %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done dump_schema(Dir, SchemaModule) -> + %% TODO: Load all apps instead of only emqx_dashboard + %% as this will help schemas that searches for apps with + %% relevant schema definitions _ = application:load(emqx_dashboard), ok = emqx_dashboard_desc_cache:init(), lists:foreach( diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 8092fadc8..9edd03078 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.24"}, + {vsn, "0.1.25"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60e94d7e3..f5bf65c0f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -447,7 +447,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a030080b7..11391fb2b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -309,7 +309,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index a4e659ce6..c2c52b6a6 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -311,6 +311,15 @@ t_rule_engine(_) -> {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}). t_downgrade_bridge_type(_) -> + case emqx_release:edition() of + ee -> + do_test_downgrade_bridge_type(); + ce -> + %% downgrade is not supported in CE + ok + end. + +do_test_downgrade_bridge_type() -> #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}), ?assertMatch( %% returns a bridges_v2 ID