From 729e7df0d5c6c53d2330967b5479f1177bebd0d1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sat, 4 Nov 2023 06:46:30 +0100 Subject: [PATCH 01/21] docs(bridge_v2): add specs for CRUD functions --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 30 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index d8646f952..3ba84c435 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -107,6 +107,20 @@ bridge_v1_start/2 ]). +%%==================================================================== +%% Types +%%==================================================================== + +-type bridge_v2_info() :: #{ + type := binary(), + name := binary(), + raw_config := map(), + resource_data := map(), + status := emqx_resource:resource_status(), + %% Explanation of the status if the status is not connected + error := term() +}. + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -157,6 +171,7 @@ unload_bridges() -> %% CRUD API %%==================================================================== +-spec lookup(binary() | atom(), binary() | atom()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(Type, Name) -> case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of not_found -> @@ -191,8 +206,8 @@ lookup(Type, Name) -> {disconnected, <<"Pending installation">>} end, {ok, #{ - type => Type, - name => Name, + type => bin(Type), + name => bin(Name), raw_config => RawConf, resource_data => InstanceData, status => DisplayBridgeV2Status, @@ -200,9 +215,12 @@ lookup(Type, Name) -> }} end. +-spec list() -> [bridge_v2_info() | {error, term()}]. list() -> list_with_lookup_fun(fun lookup/2). +-spec create(atom() | binary(), binary(), map()) -> + {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ brige_action => create, @@ -217,9 +235,10 @@ create(BridgeType, BridgeName, RawConf) -> #{override_to => cluster} ). -%% NOTE: This function can cause broken references but it is only called from -%% test cases. --spec remove(atom() | binary(), binary()) -> ok | {error, any()}. +%% NOTE: This function can cause broken references from rules but it is only +%% called directly from test cases. + +-spec remove(atom() | binary(), atom() | binary()) -> ok | {error, any()}. remove(BridgeType, BridgeName) -> ?SLOG(debug, #{ brige_action => remove, @@ -237,6 +256,7 @@ remove(BridgeType, BridgeName) -> {error, Reason} -> {error, Reason} end. +-spec check_deps_and_remove(atom() | binary(), atom() | binary(), boolean()) -> ok | {error, any()}. check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> AlsoDelete = case AlsoDeleteActions of From 99031f0dae8045188375dbf0a86811ad46b0bfe5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sat, 4 Nov 2023 07:31:27 +0100 Subject: [PATCH 02/21] refactor(bridge_v2): prefix compatibility functions with bridge_v1 --- apps/emqx_bridge/src/emqx_bridge.erl | 8 ++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 47 +++++++++++-------- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 10 ++-- 5 files changed, 39 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 8098072c0..51bdfb084 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -307,7 +307,7 @@ list() -> emqx:get_raw_config([bridges], #{}) ), BridgeV2Bridges = - emqx_bridge_v2:list_and_transform_to_bridge_v1(), + emqx_bridge_v2:bridge_v1_list_and_transform(), BridgeV1Bridges ++ BridgeV2Bridges. %%BridgeV2Bridges = emqx_bridge_v2:list(). @@ -318,7 +318,7 @@ lookup(Id) -> lookup(Type, Name) -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - emqx_bridge_v2:lookup_and_transform_to_bridge_v1(Type, Name); + emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name); false -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf) @@ -340,7 +340,7 @@ lookup(Type, Name, RawConf) -> get_metrics(Type, Name) -> case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of true -> BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), emqx_bridge_v2:get_metrics(BridgeV2Type, Name); @@ -383,7 +383,7 @@ create(BridgeType0, BridgeName, RawConf) -> }), case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_bridge_v2:split_bridge_v1_config_and_create(BridgeType, BridgeName, RawConf); + emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf); false -> emqx_conf:update( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 0b6f4c187..d263817bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -627,7 +627,7 @@ create_bridge(BridgeType, BridgeName, Conf) -> update_bridge(BridgeType, BridgeName, Conf) -> case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of true -> create_or_update_bridge(BridgeType, BridgeName, Conf, 200); false -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index c7646faf4..674eceb81 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -130,7 +130,7 @@ reset_metrics(ResourceId) -> false -> emqx_resource:reset_metrics(ResourceId); true -> - case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of true -> BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), emqx_bridge_v2:reset_metrics(BridgeV2Type, Name); diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 3ba84c435..7e0851669 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -40,6 +40,8 @@ list/0, lookup/2, create/3, + %% The remove/2 function is only for internal use as it may create + %% rules with broken dependencies remove/2, %% The following is the remove function that is called by the HTTP API %% It also checks for rule action dependencies and optionally removes @@ -73,7 +75,8 @@ -export([ id/2, id/3, - is_valid_bridge_v1/2 + bridge_v1_is_valid/2, + extract_connector_id_from_bridge_v2_id/1 ]). %% Config Update Handler API @@ -88,17 +91,23 @@ import_config/1 ]). -%% Compatibility API +%% Bridge V2 Types and Conversions -export([ bridge_v2_type_to_connector_type/1, - is_bridge_v2_type/1, - lookup_and_transform_to_bridge_v1/2, - list_and_transform_to_bridge_v1/0, + is_bridge_v2_type/1 +]). + +%% Compatibility Layer API +%% All public functions for the compatibility layer should be prefixed with +%% birdge_v1_ + +-export([ + bridge_v1_lookup_and_transform/2, + bridge_v1_list_and_transform/0, bridge_v1_check_deps_and_remove/3, - split_bridge_v1_config_and_create/3, + bridge_v1_split_config_and_create/3, bridge_v1_create_dry_run/2, - extract_connector_id_from_bridge_v2_id/1, bridge_v1_type_to_bridge_v2_type/1, bridge_v1_id_to_connector_resource_id/1, bridge_v1_enable_disable/3, @@ -215,7 +224,7 @@ lookup(Type, Name) -> }} end. --spec list() -> [bridge_v2_info() | {error, term()}]. +-spec list() -> [bridge_v2_info()] | {error, term()}. list() -> list_with_lookup_fun(fun lookup/2). @@ -1009,7 +1018,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> %% %% * The corresponding bridge v2 should exist %% * The connector for the bridge v2 should have exactly one channel -is_valid_bridge_v1(BridgeV1Type, BridgeName) -> +bridge_v1_is_valid(BridgeV1Type, BridgeName) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup_conf(BridgeV2Type, BridgeName) of {error, _} -> @@ -1050,12 +1059,12 @@ is_bridge_v2_type(<<"azure_event_hub_producer">>) -> is_bridge_v2_type(_) -> false. -list_and_transform_to_bridge_v1() -> - Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2), +bridge_v1_list_and_transform() -> + Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2), [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. -lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> - case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of +bridge_v1_lookup_and_transform(BridgeV1Type, Name) -> + case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of true -> Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case lookup(Type, Name) of @@ -1063,7 +1072,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> ConnectorType = connector_type(Type), case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, Connector} -> - lookup_and_transform_to_bridge_v1_helper( + bridge_v1_lookup_and_transform_helper( BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector ); Error -> @@ -1079,7 +1088,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) -> not_bridge_v1_compatible_error() -> {error, not_bridge_v1_compatible}. -lookup_and_transform_to_bridge_v1_helper( +bridge_v1_lookup_and_transform_helper( BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector ) -> ConnectorRawConfig1 = maps:get(raw_config, Connector), @@ -1132,7 +1141,7 @@ lookup_conf(Type, Name) -> Config end. -split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> +bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), %% Check if the bridge v2 exists case lookup_conf(BridgeV2Type, BridgeName) of @@ -1143,7 +1152,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV1Type, BridgeName, RawConf, PreviousRawConf ); _Conf -> - case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of + case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], @@ -1378,7 +1387,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) -> end. bridge_v1_enable_disable(Action, BridgeType, BridgeName) -> - case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of true -> bridge_v1_enable_disable_helper( Action, @@ -1423,7 +1432,7 @@ bridge_v1_start(BridgeV1Type, Name) -> bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), - case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of + case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of true -> connector_operation_helper_with_conf( BridgeV2Type, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 367e95784..f652e105b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -264,17 +264,17 @@ t_create_dry_run_connector_does_not_exist(_) -> BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>}, {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf). -t_is_valid_bridge_v1(_) -> +t_bridge_v1_is_valid(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), %% Add another channel/bridge to the connector {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()), - false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + false = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge_2), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2), %% Non existing bridge is a valid Bridge V1 - true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), + true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge), ok. t_manual_health_check(_) -> From cd5b1f9b96046559400c8e945bf6d923aa953909 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sat, 4 Nov 2023 08:11:37 +0100 Subject: [PATCH 03/21] docs(bridge_V2): type specs for operations --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 80 +++++++++++-------- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 10 ++- apps/emqx_resource/src/emqx_resource.erl | 2 +- .../src/emqx_resource_manager.erl | 2 +- 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7e0851669..7a74ab438 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -50,6 +50,7 @@ ]). %% Operations + -export([ disable_enable/3, health_check/2, @@ -130,6 +131,9 @@ error := term() }. +-type bridge_v2_type() :: binary() | atom(). +-type bridge_v2_name() :: binary() | atom(). + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -180,7 +184,7 @@ unload_bridges() -> %% CRUD API %%==================================================================== --spec lookup(binary() | atom(), binary() | atom()) -> {ok, bridge_v2_info()} | {error, not_found}. +-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(Type, Name) -> case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of not_found -> @@ -228,7 +232,7 @@ lookup(Type, Name) -> list() -> list_with_lookup_fun(fun lookup/2). --spec create(atom() | binary(), binary(), map()) -> +-spec create(bridge_v2_type(), bridge_v2_name(), map()) -> {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ @@ -247,7 +251,7 @@ create(BridgeType, BridgeName, RawConf) -> %% NOTE: This function can cause broken references from rules but it is only %% called directly from test cases. --spec remove(atom() | binary(), atom() | binary()) -> ok | {error, any()}. +-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}. remove(BridgeType, BridgeName) -> ?SLOG(debug, #{ brige_action => remove, @@ -265,7 +269,7 @@ remove(BridgeType, BridgeName) -> {error, Reason} -> {error, Reason} end. --spec check_deps_and_remove(atom() | binary(), atom() | binary(), boolean()) -> ok | {error, any()}. +-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}. check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> AlsoDelete = case AlsoDeleteActions of @@ -437,6 +441,8 @@ combine_connector_and_bridge_v2_config( %% Operations %%==================================================================== +-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> + {ok, any()} | {error, any()}. disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable -> @@ -514,6 +520,7 @@ connector_operation_helper_with_conf( end end. +-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -521,7 +528,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) -> ok; reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id). + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id); +reset_metrics_helper(_, _, _) -> + {error, not_found}. get_query_mode(BridgeV2Type, Config) -> CreationOpts = emqx_resource:fetch_creation_opts(Config), @@ -529,6 +538,8 @@ get_query_mode(BridgeV2Type, Config) -> ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). +-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> + term() | {error, term()}. send_message(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config0 -> @@ -562,8 +573,7 @@ do_send_msg_with_enabled_config( emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). -spec health_check(BridgeType :: term(), BridgeName :: term()) -> - #{status := term(), error := term()} | {error, Reason :: term()}. - + #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -582,6 +592,34 @@ health_check(BridgeType, BridgeName) -> Error end. +-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}. +create_dry_run(Type, Conf0) -> + Conf1 = maps:without([<<"name">>], Conf0), + TypeBin = bin(Type), + RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + %% Check config + try + _ = + hocon_tconf:check_plain( + emqx_bridge_v2_schema, + RawConf, + #{atom_key => true, required => false} + ), + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end + catch + %% validation errors + throw:Reason1 -> + {error, Reason1} + end. + create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), ConnectorType = connector_type(BridgeType), @@ -613,33 +651,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> end, emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). -create_dry_run(Type, Conf0) -> - Conf1 = maps:without([<<"name">>], Conf0), - TypeBin = bin(Type), - RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, - %% Check config - try - _ = - hocon_tconf:check_plain( - emqx_bridge_v2_schema, - RawConf, - #{atom_key => true, required => false} - ), - #{<<"connector">> := ConnectorName} = Conf1, - %% Check that the connector exists and do the dry run if it exists - ConnectorType = connector_type(Type), - case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of - not_found -> - {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; - ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) - end - catch - %% validation errors - throw:Reason1 -> - {error, Reason1} - end. - +-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics(). get_metrics(Type, Name) -> emqx_resource:get_metrics(id(Type, Name)). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index f652e105b..8cb0b5590 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -647,10 +647,12 @@ t_load_config_success(_Config) -> {ok, _}, update_root_config(RootConf0) ), + BridgeTypeBin = bin(BridgeType), + BridgeNameBin = bin(BridgeName), ?assertMatch( {ok, #{ - type := BridgeType, - name := BridgeName, + type := BridgeTypeBin, + name := BridgeNameBin, raw_config := #{}, resource_data := #{} }}, @@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 -> end; wait_until(_, _) -> ct:fail("Wait until event did not happen"). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60e94d7e3..f5bf65c0f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -447,7 +447,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a030080b7..11391fb2b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -309,7 +309,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels From 9eaee8f333b22a756c28360aae85cfa9c3e1002e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sun, 5 Nov 2023 20:27:27 +0100 Subject: [PATCH 04/21] refactor(emqx_bridge_v2): make independent of Kafka This removes the Kafka specific knowledge from emqx_bridge_v2 and makes it possible to add new Bridge V2 bridges without modifying the emqx_bridge application. --- .../include/emqx_bridge_v2_register.hrl | 136 ++++++++++++++++++ apps/emqx_bridge/src/emqx_bridge_v2.erl | 127 +++++++++++++--- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 +- .../src/schema/emqx_bridge_enterprise.erl | 5 + .../src/schema/emqx_bridge_v2_enterprise.erl | 34 +---- .../src/schema/emqx_bridge_v2_schema.erl | 44 +++++- .../src/emqx_bridge_kafka.erl | 27 +++- 7 files changed, 321 insertions(+), 54 deletions(-) create mode 100644 apps/emqx_bridge/include/emqx_bridge_v2_register.hrl diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl new file mode 100644 index 000000000..79e009fe9 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -0,0 +1,136 @@ +%% This function is called to register a bridge V2. It should be called before +%% the system boots in a function triggered by an -on_load() directive +%% since it should be called before the system boots because the config +%% system depends on that. +%% +%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2 +%% might not be loaded when the bridge module is loaded. +-spec emqx_bridge_v2_register_bridge_type(#{ + %% Should be provided by all bridges. Even if the bridge_v2_type_name is + %% the same as the bridge_v1_type_named. + 'bridge_v1_type_name' := atom(), + 'bridge_v2_type_name' := atom(), + 'connector_type' := atom(), + 'schema_module' := atom(), + 'schema_struct_field' := atom() | binary() +}) -> ok. +emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) -> + try + %% We must prevent overwriting so we take a lock when writing to persistent_term + global:set_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()], + infinity + ), + internal_maybe_create_initial_bridge_v2_info_map(), + internal_register_bridge_type_with_lock(BridgeTypeInfo) + catch + ErrorType:Reason:Stacktrace -> + %% Print the error on standard output as logger might not be + %% started yet + io:format("~p~n", [ + #{ + 'error_type' => ErrorType, + 'reason' => Reason, + 'stacktrace' => Stacktrace, + 'msg' => "Failed to register bridge V2 type" + } + ]), + erlang:raise(ErrorType, Reason, Stacktrace) + after + global:del_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()] + ) + end, + ok. + +internal_register_bridge_type_with_lock(BridgeTypeInfo) -> + InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + %% The Bridge V1 type is also a bridge V2 type due to backwards compatibility + InfoMap1 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_names, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap0, + true + ), + InfoMap2 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_names, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap1, + true + ), + InfoMap3 = emqx_utils_maps:deep_force_put( + [ + bridge_v1_type_to_bridge_v2_type, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap2, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ), + InfoMap4 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_connector_type, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap3, + maps:get(connector_type, BridgeTypeInfo) + ), + %% Backwards compatibility + InfoMap5 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_connector_type, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap4, + maps:get(connector_type, BridgeTypeInfo) + ), + InfoMap6 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_schema_module, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap5, + maps:get(schema_module, BridgeTypeInfo) + ), + InfoMap7 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_schema_struct_field, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap6, + maps:get(schema_struct_field, BridgeTypeInfo) + ), + + ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap7). + +internal_maybe_create_initial_bridge_v2_info_map() -> + case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of + undefined -> + ok = persistent_term:put( + internal_emqx_bridge_v2_persistent_term_info_key(), + #{ + bridge_v2_type_names => #{}, + bridge_v1_type_to_bridge_v2_type => #{}, + bridge_v2_type_to_connector_type => #{}, + bridge_v2_type_to_schema_module => #{}, + bridge_v2_type_to_schema_struct_field => #{} + } + ), + ok; + _ -> + ok + end. + +internal_emqx_bridge_v2_persistent_term_info_key() -> + ?FUNCTION_NAME. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7a74ab438..4989cbae2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -18,6 +18,13 @@ -behaviour(emqx_config_handler). -behaviour(emqx_config_backup). +-compile([ + {nowarn_unused_function, [ + {internal_register_bridge_type_with_lock, 1}, + {emqx_bridge_v2_register_bridge_type, 1} + ]} +]). + -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -28,6 +35,15 @@ %% refactored into a new module/application with appropriate name. -define(ROOT_KEY, actions). +-on_load(on_load/0). + +%% Getting registered bridge schemas: + +-export([ + bridge_v2_type_to_schame_stuct_field/1, + registered_schema_modules/0 +]). + %% Loading and unloading config when EMQX starts and stops -export([ load/0, @@ -134,6 +150,55 @@ -type bridge_v2_type() :: binary() | atom(). -type bridge_v2_name() :: binary() | atom(). +%%==================================================================== + +%%==================================================================== + +-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). + +on_load() -> + try + %% We must prevent overwriting so we take a lock when writing to persistent_term + global:set_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()], + infinity + ), + internal_maybe_create_initial_bridge_v2_info_map() + catch + ErrorType:Reason:Stacktrace -> + %% Logger may not be started so print to stdout + io:format("~p~n", #{ + 'error_type' => ErrorType, + 'reason' => Reason, + 'stacktrace' => Stacktrace, + 'msg' => "Failed to register bridge V2 type" + }), + erlang:raise(ErrorType, Reason, Stacktrace) + after + global:del_lock( + { + internal_emqx_bridge_v2_persistent_term_info_key(), + internal_emqx_bridge_v2_persistent_term_info_key() + }, + [node()] + ) + end, + ok. + +bridge_v2_type_to_schame_stuct_field(BridgeV2Type) -> + InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap), + maps:get(BridgeV2Type, Map). + +registered_schema_modules() -> + InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap), + maps:to_list(Schemas). + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -822,12 +887,20 @@ connector_type(Type) -> bridge_v2_type_to_connector_type(Type) when not is_atom(Type) -> bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); -bridge_v2_type_to_connector_type(kafka) -> - %% backward compatible - kafka_producer; -bridge_v2_type_to_connector_type(kafka_producer) -> - kafka_producer; -bridge_v2_type_to_connector_type(azure_event_hub_producer) -> +bridge_v2_type_to_connector_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of + undefined -> bridge_v2_type_to_connector_type_old(Type); + ConnectorType -> ConnectorType + end. + +% bridge_v2_type_to_connector_type_old(kafka) -> +% %% backward compatible +% kafka_producer; +% bridge_v2_type_to_connector_type_old(kafka_producer) -> +% kafka_producer; +bridge_v2_type_to_connector_type_old(azure_event_hub_producer) -> azure_event_hub_producer. %%==================================================================== @@ -1050,25 +1123,41 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); -bridge_v1_type_to_bridge_v2_type(kafka) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(kafka_producer) -> - kafka_producer; -bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> +bridge_v1_type_to_bridge_v2_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of + undefined -> bridge_v1_type_to_bridge_v2_type_old(Type); + BridgeV2Type -> BridgeV2Type + end. + +% bridge_v1_type_to_bridge_v2_type_old(kafka) -> +% kafka_producer; +% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> +% kafka_producer; +bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> azure_event_hub_producer. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. -is_bridge_v2_type(Atom) when is_atom(Atom) -> - is_bridge_v2_type(atom_to_binary(Atom, utf8)); -is_bridge_v2_type(<<"kafka_producer">>) -> +is_bridge_v2_type(Bin) when is_binary(Bin) -> + is_bridge_v2_type(binary_to_existing_atom(Bin)); +is_bridge_v2_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap), + case maps:get(Type, BridgeV2Types, undefined) of + undefined -> is_bridge_v2_type_old(Type); + _ -> true + end. + +% is_bridge_v2_type_old(kafka_producer) -> +% true; +% is_bridge_v2_type_old(kafka) -> +% true; +is_bridge_v2_type_old(azure_event_hub_producer) -> true; -is_bridge_v2_type(<<"kafka">>) -> - true; -is_bridge_v2_type(<<"azure_event_hub_producer">>) -> - true; -is_bridge_v2_type(_) -> +is_bridge_v2_type_old(_) -> false. bridge_v1_list_and_transform() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 634337d99..b28d5ec01 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -99,7 +99,7 @@ get_response_body_schema() -> bridge_info_examples(Method) -> maps:merge( - #{}, + emqx_bridge_v2_schema:examples(Method), emqx_enterprise_bridge_examples(Method) ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 5cbc709a5..926cf296c 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -82,6 +82,11 @@ schema_modules() -> ]. examples(Method) -> + EnterpriseExamples = emqx_bridge_v2_enterprise:examples(Method), + RegisteredExamples = registered_examples(Method), + maps:merge(EnterpriseExamples, RegisteredExamples). + +registered_examples(Method) -> MergeFun = fun(Example, Examples) -> maps:merge(Examples, Example) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 54448f07d..29dc71b69 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -26,42 +26,16 @@ examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). schema_modules() -> - [ - emqx_bridge_kafka, - emqx_bridge_azure_event_hub - ]. + []. fields(actions) -> action_structs(). action_structs() -> - [ - {kafka_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), - #{ - desc => <<"Kafka Producer Actions Config">>, - required => false - } - )}, - {azure_event_hub_producer, - mk( - hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)), - #{ - desc => <<"Azure Event Hub Actions Config">>, - required => false - } - )} - ]. + []. -api_schemas(Method) -> - [ - api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") - ]. - -api_ref(Module, Type, Method) -> - {Type, ref(Module, Method)}. +api_schemas(_Method) -> + []. -else. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 1d059903a..5fe86ec51 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -27,7 +27,8 @@ -export([ get_response/0, put_request/0, - post_request/0 + post_request/0, + examples/1 ]). -export([types/0, types_sc/0]). @@ -85,7 +86,21 @@ post_request() -> api_schema(Method) -> EE = ?MODULE:enterprise_api_schemas(Method), - hoconsc:union(bridge_api_union(EE)). + APISchemas = registered_api_schemas(Method), + hoconsc:union(bridge_api_union(EE ++ APISchemas)). + +registered_api_schemas(Method) -> + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_bridge_v2:module_info(), + RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(), + [ + api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") + || {BridgeV2Type, SchemaModule} <- RegistredSchmeas + ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}. bridge_api_union(Refs) -> Index = maps:from_list(Refs), @@ -133,7 +148,17 @@ roots() -> end. fields(actions) -> - [] ++ enterprise_fields_actions(). + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_bridge_v2:module_info(), + enterprise_fields_actions() ++ + registered_schema_fields(). + +registered_schema_fields() -> + [ + Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_stuct_field(BridgeV2Type)) + || {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules() + ]. desc(actions) -> ?DESC("desc_bridges_v2"); @@ -148,6 +173,19 @@ types() -> types_sc() -> hoconsc:enum(types()). +examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()], + lists:foldl(Fun, #{}, SchemaModules). + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 55ac36d1a..83ecf7b05 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -16,6 +16,8 @@ ]). -import(hoconsc, [mk/2, enum/1, ref/2]). +-on_load(register_bridge_v2/0). + -export([ bridge_v2_examples/1, conn_bridge_examples/1, @@ -526,7 +528,16 @@ fields(consumer_kafka_opts) -> fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), - lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts). + lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); +fields(bridge_v2_field) -> + {kafka_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), + #{ + desc => <<"Kafka Producer Bridge V2 Config">>, + required => false + } + )}. desc("config_connector") -> ?DESC("desc_config"); @@ -714,3 +725,17 @@ kafka_ext_header_value_validator(Value) -> "placeholder like ${foo}, or a simple string." } end. + +-include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). + +register_bridge_v2() -> + emqx_bridge_v2_register_bridge_type(#{ + %% Should be provided by all bridges. Even if the bridge_v2_type_name is + %% the same as the bridge_v1_type_named. + 'bridge_v1_type_name' => kafka, + 'bridge_v2_type_name' => kafka_producer, + 'connector_type' => kafka_producer, + 'schema_module' => ?MODULE, + 'schema_struct_field' => bridge_v2_field + }), + ok. From 14e305e2a623b7949feabc2a17b6c47ba40ca04e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 07:38:02 +0100 Subject: [PATCH 05/21] fix(emqx_bridge_v2): xref warning --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 4989cbae2..f40f40466 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -24,6 +24,8 @@ {emqx_bridge_v2_register_bridge_type, 1} ]} ]). +-ignore_xref(emqx_bridge_v2_register_bridge_type/1). +-ignore_xref(internal_register_bridge_type_with_lock/1). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). From 77aaff137a6482245dd8ba3122c16fda134e09a5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 07:47:11 +0100 Subject: [PATCH 06/21] test(emqx_bridge_v2_SUITE): fix incorrect assumption about return type This commit fixes an incorrect assumption about the return type of emqx_bridge_v2:lookup/2 in emqx_bridge_v2_SUITE:t_load_config_success/1. --- apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 8cb0b5590..2766088a1 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -667,8 +667,8 @@ t_load_config_success(_Config) -> ), ?assertMatch( {ok, #{ - type := BridgeType, - name := BridgeName, + type := BridgeTypeBin, + name := BridgeNameBin, raw_config := #{<<"some_key">> := <<"new_value">>}, resource_data := #{} }}, From 3c778121a512826b8dbbd4d8b720352139adef9f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 08:01:57 +0100 Subject: [PATCH 07/21] fix: bridge V1 type lookup issue --- .../include/emqx_bridge_v2_register.hrl | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl index 79e009fe9..362b671ad 100644 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -78,41 +78,50 @@ internal_register_bridge_type_with_lock(BridgeTypeInfo) -> InfoMap2, maps:get(bridge_v2_type_name, BridgeTypeInfo) ), + %% Backwards compatibility InfoMap4 = emqx_utils_maps:deep_force_put( [ - bridge_v2_type_to_connector_type, + bridge_v1_type_to_bridge_v2_type, maps:get(bridge_v2_type_name, BridgeTypeInfo) ], InfoMap3, - maps:get(connector_type, BridgeTypeInfo) + maps:get(bridge_v2_type_name, BridgeTypeInfo) ), - %% Backwards compatibility InfoMap5 = emqx_utils_maps:deep_force_put( [ bridge_v2_type_to_connector_type, - maps:get(bridge_v1_type_name, BridgeTypeInfo) + maps:get(bridge_v2_type_name, BridgeTypeInfo) ], InfoMap4, maps:get(connector_type, BridgeTypeInfo) ), + %% Backwards compatibility InfoMap6 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_connector_type, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ], + InfoMap5, + maps:get(connector_type, BridgeTypeInfo) + ), + InfoMap7 = emqx_utils_maps:deep_force_put( [ bridge_v2_type_to_schema_module, maps:get(bridge_v2_type_name, BridgeTypeInfo) ], - InfoMap5, + InfoMap6, maps:get(schema_module, BridgeTypeInfo) ), - InfoMap7 = emqx_utils_maps:deep_force_put( + InfoMap8 = emqx_utils_maps:deep_force_put( [ bridge_v2_type_to_schema_struct_field, maps:get(bridge_v2_type_name, BridgeTypeInfo) ], - InfoMap6, + InfoMap7, maps:get(schema_struct_field, BridgeTypeInfo) ), - ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap7). + ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap8). internal_maybe_create_initial_bridge_v2_info_map() -> case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of From a6aa81b5484b0f3e811b1e7dcc04f33333cc7fe9 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 09:43:15 +0100 Subject: [PATCH 08/21] fix(emqx_bridge_v2): dialyzer found bad format parameter error --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index f40f40466..1a0160b46 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -173,12 +173,14 @@ on_load() -> catch ErrorType:Reason:Stacktrace -> %% Logger may not be started so print to stdout - io:format("~p~n", #{ - 'error_type' => ErrorType, - 'reason' => Reason, - 'stacktrace' => Stacktrace, - 'msg' => "Failed to register bridge V2 type" - }), + io:format("~p~n", [ + #{ + 'error_type' => ErrorType, + 'reason' => Reason, + 'stacktrace' => Stacktrace, + 'msg' => "Failed to register bridge V2 type" + } + ]), erlang:raise(ErrorType, Reason, Stacktrace) after global:del_lock( From d26a1b9afbc61ec25fe43f5419c48c2452c2c774 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 12:03:11 +0100 Subject: [PATCH 09/21] fix(bridge_v1): no hard coded downgrade and upgrade type functions --- .../include/emqx_bridge_v2_register.hrl | 12 +++++++++-- apps/emqx_bridge/src/emqx_bridge_lib.erl | 20 +++++++----------- apps/emqx_bridge/src/emqx_bridge_v2.erl | 21 ++++++++++++++++++- .../test/emqx_rule_engine_api_SUITE.erl | 9 ++++++++ 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl index 362b671ad..186564bef 100644 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -120,8 +120,15 @@ internal_register_bridge_type_with_lock(BridgeTypeInfo) -> InfoMap7, maps:get(schema_struct_field, BridgeTypeInfo) ), - - ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap8). + InfoMap9 = emqx_utils_maps:deep_force_put( + [ + bridge_v2_type_to_bridge_v1_type, + maps:get(bridge_v2_type_name, BridgeTypeInfo) + ], + InfoMap8, + maps:get(bridge_v1_type_name, BridgeTypeInfo) + ), + ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9). internal_maybe_create_initial_bridge_v2_info_map() -> case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of @@ -131,6 +138,7 @@ internal_maybe_create_initial_bridge_v2_info_map() -> #{ bridge_v2_type_names => #{}, bridge_v1_type_to_bridge_v2_type => #{}, + bridge_v2_type_to_bridge_v1_type => #{}, bridge_v2_type_to_connector_type => #{}, bridge_v2_type_to_schema_module => #{}, bridge_v2_type_to_schema_struct_field => #{} diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index b11344ee1..e8ea422f6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -53,20 +53,16 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) -> end. %% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1. -upgrade_type(kafka) -> - kafka_producer; -upgrade_type(<<"kafka">>) -> - <<"kafka_producer">>; -upgrade_type(Other) -> - Other. +upgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type); +upgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)). %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1 -downgrade_type(kafka_producer) -> - kafka; -downgrade_type(<<"kafka_producer">>) -> - <<"kafka">>; -downgrade_type(Other) -> - Other. +downgrade_type(Type) when is_atom(Type) -> + emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type); +downgrade_type(Type) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)). %% A rule might be referencing an old version bridge type name %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 1a0160b46..d4a621452 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -128,6 +128,8 @@ bridge_v1_split_config_and_create/3, bridge_v1_create_dry_run/2, bridge_v1_type_to_bridge_v2_type/1, + %% Exception from the naming convention: + bridge_v2_type_to_bridge_v1_type/1, bridge_v1_id_to_connector_resource_id/1, bridge_v1_enable_disable/3, bridge_v1_restart/2, @@ -1140,7 +1142,24 @@ bridge_v1_type_to_bridge_v2_type(Type) -> % bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> % kafka_producer; bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v1_type_to_bridge_v2_type_old(Type) -> + Type. + +bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> + ?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); +bridge_v2_type_to_bridge_v1_type(Type) -> + BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), + BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap), + case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of + undefined -> bridge_v2_type_to_bridge_v1_type_old(Type); + BridgeV1Type -> BridgeV1Type + end. + +bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +bridge_v2_type_to_bridge_v1_type_old(Type) -> + Type. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index a4e659ce6..c2c52b6a6 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -311,6 +311,15 @@ t_rule_engine(_) -> {400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}). t_downgrade_bridge_type(_) -> + case emqx_release:edition() of + ee -> + do_test_downgrade_bridge_type(); + ce -> + %% downgrade is not supported in CE + ok + end. + +do_test_downgrade_bridge_type() -> #{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}), ?assertMatch( %% returns a bridges_v2 ID From 7839f0cbc81e39f81b1102418414aec5a4485b40 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Nov 2023 17:30:03 +0100 Subject: [PATCH 10/21] chore(bridge_v2): make dialyzer and bpapi happy --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index d4a621452..66d8e1899 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -151,8 +151,8 @@ error := term() }. --type bridge_v2_type() :: binary() | atom(). --type bridge_v2_name() :: binary() | atom(). +-type bridge_v2_type() :: binary() | atom() | [byte()]. +-type bridge_v2_name() :: binary() | atom() | [byte()]. %%==================================================================== From ca2cdbc08df6ed130d6734118a321f5c5979d00c Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 Nov 2023 07:23:51 +0100 Subject: [PATCH 11/21] fix: bapi static check error --- apps/emqx_bridge/src/emqx_bridge_lib.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index e8ea422f6..4be605745 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -56,13 +56,17 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) -> upgrade_type(Type) when is_atom(Type) -> emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type); upgrade_type(Type) when is_binary(Type) -> - atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)). + atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type)); +upgrade_type(Type) when is_list(Type) -> + atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))). %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1 downgrade_type(Type) when is_atom(Type) -> emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type); downgrade_type(Type) when is_binary(Type) -> - atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)). + atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)); +downgrade_type(Type) when is_list(Type) -> + atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))). %% A rule might be referencing an old version bridge type name %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both From bdab421885996d786bb10bd2947ca43d43c91995 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 Nov 2023 07:48:09 +0100 Subject: [PATCH 12/21] fix(bridge_v2): name and copyright header --- .../include/emqx_bridge_v2_register.hrl | 17 +++++++++++++++++ apps/emqx_bridge/src/emqx_bridge_v2.erl | 4 ++-- .../src/schema/emqx_bridge_v2_schema.erl | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl index 186564bef..3f3d0a66b 100644 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -1,3 +1,20 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + + %% This function is called to register a bridge V2. It should be called before %% the system boots in a function triggered by an -on_load() directive %% since it should be called before the system boots because the config diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 66d8e1899..6406f7df2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -42,7 +42,7 @@ %% Getting registered bridge schemas: -export([ - bridge_v2_type_to_schame_stuct_field/1, + bridge_v2_type_to_schame_struct_field/1, registered_schema_modules/0 ]). @@ -195,7 +195,7 @@ on_load() -> end, ok. -bridge_v2_type_to_schame_stuct_field(BridgeV2Type) -> +bridge_v2_type_to_schame_struct_field(BridgeV2Type) -> InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap), maps:get(BridgeV2Type, Map). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 5fe86ec51..c380ac933 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -156,7 +156,7 @@ fields(actions) -> registered_schema_fields() -> [ - Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_stuct_field(BridgeV2Type)) + Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_struct_field(BridgeV2Type)) || {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules() ]. From ab078647a5a86549be46874d3742129fb7186640 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 Nov 2023 08:36:31 +0100 Subject: [PATCH 13/21] chore: fix formatting problem --- apps/emqx_bridge/include/emqx_bridge_v2_register.hrl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl index 3f3d0a66b..9c0360c56 100644 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- - %% This function is called to register a bridge V2. It should be called before %% the system boots in a function triggered by an -on_load() directive %% since it should be called before the system boots because the config From 5e8e40701735838a84ef11987b624d7eb45b118e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 9 Nov 2023 13:37:53 +0100 Subject: [PATCH 14/21] refactor: action schema retrival after PR feedback --- .../include/emqx_bridge_v2_register.hrl | 169 ------------- apps/emqx_bridge/src/emqx_action_info.erl | 225 ++++++++++++++++++ apps/emqx_bridge/src/emqx_bridge_v2.erl | 135 +---------- .../src/schema/emqx_bridge_v2_schema.erl | 8 +- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 20 +- .../src/emqx_bridge_kafka_action_info.erl | 18 ++ apps/emqx_conf/src/emqx_conf.erl | 3 + 8 files changed, 258 insertions(+), 322 deletions(-) delete mode 100644 apps/emqx_bridge/include/emqx_bridge_v2_register.hrl create mode 100644 apps/emqx_bridge/src/emqx_action_info.erl create mode 100644 apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl diff --git a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl b/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl deleted file mode 100644 index 9c0360c56..000000000 --- a/apps/emqx_bridge/include/emqx_bridge_v2_register.hrl +++ /dev/null @@ -1,169 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% This function is called to register a bridge V2. It should be called before -%% the system boots in a function triggered by an -on_load() directive -%% since it should be called before the system boots because the config -%% system depends on that. -%% -%% It is placed in an hrl file instead of in emqx_bridge_v2.erl because emqx_bridge_v2 -%% might not be loaded when the bridge module is loaded. --spec emqx_bridge_v2_register_bridge_type(#{ - %% Should be provided by all bridges. Even if the bridge_v2_type_name is - %% the same as the bridge_v1_type_named. - 'bridge_v1_type_name' := atom(), - 'bridge_v2_type_name' := atom(), - 'connector_type' := atom(), - 'schema_module' := atom(), - 'schema_struct_field' := atom() | binary() -}) -> ok. -emqx_bridge_v2_register_bridge_type(BridgeTypeInfo) -> - try - %% We must prevent overwriting so we take a lock when writing to persistent_term - global:set_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()], - infinity - ), - internal_maybe_create_initial_bridge_v2_info_map(), - internal_register_bridge_type_with_lock(BridgeTypeInfo) - catch - ErrorType:Reason:Stacktrace -> - %% Print the error on standard output as logger might not be - %% started yet - io:format("~p~n", [ - #{ - 'error_type' => ErrorType, - 'reason' => Reason, - 'stacktrace' => Stacktrace, - 'msg' => "Failed to register bridge V2 type" - } - ]), - erlang:raise(ErrorType, Reason, Stacktrace) - after - global:del_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()] - ) - end, - ok. - -internal_register_bridge_type_with_lock(BridgeTypeInfo) -> - InfoMap0 = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - %% The Bridge V1 type is also a bridge V2 type due to backwards compatibility - InfoMap1 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_names, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap0, - true - ), - InfoMap2 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_names, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap1, - true - ), - InfoMap3 = emqx_utils_maps:deep_force_put( - [ - bridge_v1_type_to_bridge_v2_type, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap2, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ), - %% Backwards compatibility - InfoMap4 = emqx_utils_maps:deep_force_put( - [ - bridge_v1_type_to_bridge_v2_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap3, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ), - InfoMap5 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_connector_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap4, - maps:get(connector_type, BridgeTypeInfo) - ), - %% Backwards compatibility - InfoMap6 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_connector_type, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ], - InfoMap5, - maps:get(connector_type, BridgeTypeInfo) - ), - InfoMap7 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_schema_module, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap6, - maps:get(schema_module, BridgeTypeInfo) - ), - InfoMap8 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_schema_struct_field, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap7, - maps:get(schema_struct_field, BridgeTypeInfo) - ), - InfoMap9 = emqx_utils_maps:deep_force_put( - [ - bridge_v2_type_to_bridge_v1_type, - maps:get(bridge_v2_type_name, BridgeTypeInfo) - ], - InfoMap8, - maps:get(bridge_v1_type_name, BridgeTypeInfo) - ), - ok = persistent_term:put(internal_emqx_bridge_v2_persistent_term_info_key(), InfoMap9). - -internal_maybe_create_initial_bridge_v2_info_map() -> - case persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key(), undefined) of - undefined -> - ok = persistent_term:put( - internal_emqx_bridge_v2_persistent_term_info_key(), - #{ - bridge_v2_type_names => #{}, - bridge_v1_type_to_bridge_v2_type => #{}, - bridge_v2_type_to_bridge_v1_type => #{}, - bridge_v2_type_to_connector_type => #{}, - bridge_v2_type_to_schema_module => #{}, - bridge_v2_type_to_schema_struct_field => #{} - } - ), - ok; - _ -> - ok - end. - -internal_emqx_bridge_v2_persistent_term_info_key() -> - ?FUNCTION_NAME. diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl new file mode 100644 index 000000000..b4b6ed88e --- /dev/null +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -0,0 +1,225 @@ +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The module which knows everything about actions. + +%% NOTE: it does not cover the V1 bridges. + +-module(emqx_action_info). + +-export([ + action_type_to_connector_type/1, + action_type_to_bridge_v1_type/1, + bridge_v1_type_to_action_type/1, + is_action_type/1, + registered_schema_modules/0 +]). + +-callback bridge_v1_type_name() -> atom(). +-callback action_type_name() -> atom(). +-callback connector_type_name() -> atom(). +-callback schema_module() -> atom(). + +-optional_callbacks([bridge_v1_type_name/0]). + +%% ==================================================================== +%% Hadcoded list of info modules for actions +%% TODO: Remove this list once we have made sure that all relevants +%% apps are loaded before this module is called. +%% ==================================================================== + +-if(?EMQX_RELEASE_EDITION == ee). +hard_coded_action_info_modules_ee() -> + [emqx_bridge_kafka_action_info]. +-else. +hard_coded_action_info_modules_ee() -> + []. +-endif. + +hard_coded_action_info_modules_common() -> + []. + +hard_coded_action_info_modules() -> + hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). + +%% ==================================================================== +%% API +%% ==================================================================== + +action_type_to_connector_type(Type) when not is_atom(Type) -> + action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); +action_type_to_connector_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap), + case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of + undefined -> action_type_to_connector_type_old(Type); + ConnectorType -> ConnectorType + end. + +% action_type_to_connector_type_old(kafka) -> +% %% backward compatible +% kafka_producer; +% action_type_to_connector_type_old(kafka_producer) -> +% kafka_producer; +action_type_to_connector_type_old(azure_event_hub_producer) -> + azure_event_hub_producer. + +bridge_v1_type_to_action_type(Bin) when is_binary(Bin) -> + bridge_v1_type_to_action_type(binary_to_existing_atom(Bin)); +bridge_v1_type_to_action_type(Type) -> + ActionInfoMap = info_map(), + BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap), + case maps:get(Type, BridgeV1TypeToActionType, undefined) of + undefined -> bridge_v1_type_to_action_type_old(Type); + ActionType -> ActionType + end. + +% bridge_v1_type_to_action_type_old(kafka) -> +% kafka_producer; +% bridge_v1_type_to_action_type_old(kafka_producer) -> +% kafka_producer; +bridge_v1_type_to_action_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +bridge_v1_type_to_action_type_old(Type) -> + Type. + +action_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> + action_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); +action_type_to_bridge_v1_type(Type) -> + ActionInfoMap = info_map(), + ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), + case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of + undefined -> action_type_to_bridge_v1_type_old(Type); + BridgeV1Type -> BridgeV1Type + end. + +action_type_to_bridge_v1_type_old(azure_event_hub_producer) -> + azure_event_hub_producer; +action_type_to_bridge_v1_type_old(Type) -> + Type. + +%% This function should return true for all inputs that are bridge V1 types for +%% bridges that have been refactored to bridge V2s, and for all all bridge V2 +%% types. For everything else the function should return false. +is_action_type(Bin) when is_binary(Bin) -> + is_action_type(binary_to_existing_atom(Bin)); +is_action_type(Type) -> + ActionInfoMap = info_map(), + ActionTypes = maps:get(action_type_names, ActionInfoMap), + case maps:get(Type, ActionTypes, undefined) of + undefined -> is_action_type_old(Type); + _ -> true + end. + +% is_action_type_old(kafka_producer) -> +% true; +% is_action_type_old(kafka) -> +% true; +is_action_type_old(azure_event_hub_producer) -> + true; +is_action_type_old(_) -> + false. + +registered_schema_modules() -> + InfoMap = info_map(), + Schemas = maps:get(action_type_to_schema_module, InfoMap), + maps:to_list(Schemas). + +%% ==================================================================== +%% Internal functions for building the info map and accessing it +%% ==================================================================== + +internal_emqx_action_persistent_term_info_key() -> + ?FUNCTION_NAME. + +info_map() -> + case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of + not_found -> + build_cache(); + ActionInfoMap -> + ActionInfoMap + end. + +build_cache() -> + ActionInfoModules = action_info_modules(), + ActionInfoMap = + lists:foldl( + fun(Module, InfoMapSoFar) -> + ModuleInfoMap = get_info_map(Module), + emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap) + end, + initial_info_map(), + ActionInfoModules + ), + %% Update the persistent term with the new info map + persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap), + ActionInfoMap. + +action_info_modules() -> + ActionInfoModules = [ + action_info_modules(App) + || {App, _, _} <- application:loaded_applications() + ], + lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). + +action_info_modules(App) -> + case application:get_env(App, emqx, action_info_module) of + {ok, Module} -> + [Module]; + _ -> + [] + end. + +initial_info_map() -> + #{ + action_type_names => #{}, + bridge_v1_type_to_action_type => #{}, + action_type_to_bridge_v1_type => #{}, + action_type_to_connector_type => #{}, + action_type_to_schema_module => #{} + }. + +get_info_map(Module) -> + %% Force the module to get loaded + _ = code:ensure_loaded(Module), + ActionType = Module:action_type_name(), + BridgeV1Type = + case erlang:function_exported(Module, bridge_v1_type_name, 0) of + true -> + Module:bridge_v1_type_name(); + false -> + Module:action_type_name() + end, + #{ + action_type_names => #{ + ActionType => true, + BridgeV1Type => true + }, + bridge_v1_type_to_action_type => #{ + BridgeV1Type => ActionType, + %% Alias the bridge V1 type to the action type + ActionType => ActionType + }, + action_type_to_bridge_v1_type => #{ + ActionType => BridgeV1Type + }, + action_type_to_connector_type => #{ + ActionType => Module:connector_type_name(), + %% Alias the bridge V1 type to the action type + BridgeV1Type => Module:connector_type_name() + }, + action_type_to_schema_module => #{ + ActionType => Module:schema_module() + } + }. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 6406f7df2..77ef8bef3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -18,15 +18,6 @@ -behaviour(emqx_config_handler). -behaviour(emqx_config_backup). --compile([ - {nowarn_unused_function, [ - {internal_register_bridge_type_with_lock, 1}, - {emqx_bridge_v2_register_bridge_type, 1} - ]} -]). --ignore_xref(emqx_bridge_v2_register_bridge_type/1). --ignore_xref(internal_register_bridge_type_with_lock/1). - -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -37,15 +28,6 @@ %% refactored into a new module/application with appropriate name. -define(ROOT_KEY, actions). --on_load(on_load/0). - -%% Getting registered bridge schemas: - --export([ - bridge_v2_type_to_schame_struct_field/1, - registered_schema_modules/0 -]). - %% Loading and unloading config when EMQX starts and stops -export([ load/0, @@ -158,53 +140,6 @@ %%==================================================================== --include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). - -on_load() -> - try - %% We must prevent overwriting so we take a lock when writing to persistent_term - global:set_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()], - infinity - ), - internal_maybe_create_initial_bridge_v2_info_map() - catch - ErrorType:Reason:Stacktrace -> - %% Logger may not be started so print to stdout - io:format("~p~n", [ - #{ - 'error_type' => ErrorType, - 'reason' => Reason, - 'stacktrace' => Stacktrace, - 'msg' => "Failed to register bridge V2 type" - } - ]), - erlang:raise(ErrorType, Reason, Stacktrace) - after - global:del_lock( - { - internal_emqx_bridge_v2_persistent_term_info_key(), - internal_emqx_bridge_v2_persistent_term_info_key() - }, - [node()] - ) - end, - ok. - -bridge_v2_type_to_schame_struct_field(BridgeV2Type) -> - InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - Map = maps:get(bridge_v2_type_to_schema_struct_field, InfoMap), - maps:get(BridgeV2Type, Map). - -registered_schema_modules() -> - InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - Schemas = maps:get(bridge_v2_type_to_schema_module, InfoMap), - maps:to_list(Schemas). - %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -891,23 +826,8 @@ connector_type(Type) -> %% remote call so it can be mocked ?MODULE:bridge_v2_type_to_connector_type(Type). -bridge_v2_type_to_connector_type(Type) when not is_atom(Type) -> - bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type))); bridge_v2_type_to_connector_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2TypeToConnectorTypeMap = maps:get(bridge_v2_type_to_connector_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV2TypeToConnectorTypeMap, undefined) of - undefined -> bridge_v2_type_to_connector_type_old(Type); - ConnectorType -> ConnectorType - end. - -% bridge_v2_type_to_connector_type_old(kafka) -> -% %% backward compatible -% kafka_producer; -% bridge_v2_type_to_connector_type_old(kafka_producer) -> -% kafka_producer; -bridge_v2_type_to_connector_type_old(azure_event_hub_producer) -> - azure_event_hub_producer. + emqx_action_info:action_type_to_connector_type(Type). %%==================================================================== %% Data backup API @@ -1127,61 +1047,14 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> end end. -bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> - ?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_bridge_v2_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV1TypeToBridgeV2Type = maps:get(bridge_v1_type_to_bridge_v2_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV1TypeToBridgeV2Type, undefined) of - undefined -> bridge_v1_type_to_bridge_v2_type_old(Type); - BridgeV2Type -> BridgeV2Type - end. + emqx_action_info:bridge_v1_type_to_action_type(Type). -% bridge_v1_type_to_bridge_v2_type_old(kafka) -> -% kafka_producer; -% bridge_v1_type_to_bridge_v2_type_old(kafka_producer) -> -% kafka_producer; -bridge_v1_type_to_bridge_v2_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -bridge_v1_type_to_bridge_v2_type_old(Type) -> - Type. - -bridge_v2_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> - ?MODULE:bridge_v2_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); bridge_v2_type_to_bridge_v1_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2TypeToBridgeV1Type = maps:get(bridge_v2_type_to_bridge_v1_type, BridgeV2InfoMap), - case maps:get(Type, BridgeV2TypeToBridgeV1Type, undefined) of - undefined -> bridge_v2_type_to_bridge_v1_type_old(Type); - BridgeV1Type -> BridgeV1Type - end. + emqx_action_info:action_type_to_bridge_v1_type(Type). -bridge_v2_type_to_bridge_v1_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -bridge_v2_type_to_bridge_v1_type_old(Type) -> - Type. - -%% This function should return true for all inputs that are bridge V1 types for -%% bridges that have been refactored to bridge V2s, and for all all bridge V2 -%% types. For everything else the function should return false. -is_bridge_v2_type(Bin) when is_binary(Bin) -> - is_bridge_v2_type(binary_to_existing_atom(Bin)); is_bridge_v2_type(Type) -> - BridgeV2InfoMap = persistent_term:get(internal_emqx_bridge_v2_persistent_term_info_key()), - BridgeV2Types = maps:get(bridge_v2_type_names, BridgeV2InfoMap), - case maps:get(Type, BridgeV2Types, undefined) of - undefined -> is_bridge_v2_type_old(Type); - _ -> true - end. - -% is_bridge_v2_type_old(kafka_producer) -> -% true; -% is_bridge_v2_type_old(kafka) -> -% true; -is_bridge_v2_type_old(azure_event_hub_producer) -> - true; -is_bridge_v2_type_old(_) -> - false. + emqx_action_info:is_action_type(Type). bridge_v1_list_and_transform() -> Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2), diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index c380ac933..65250ad40 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -93,7 +93,7 @@ registered_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use %% `call_hocon' from `nodetool' to generate initial configurations. _ = emqx_bridge_v2:module_info(), - RegistredSchmeas = emqx_bridge_v2:registered_schema_modules(), + RegistredSchmeas = emqx_action_info:registered_schema_modules(), [ api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") || {BridgeV2Type, SchemaModule} <- RegistredSchmeas @@ -156,8 +156,8 @@ fields(actions) -> registered_schema_fields() -> [ - Module:fields(emqx_bridge_v2:bridge_v2_type_to_schame_struct_field(BridgeV2Type)) - || {BridgeV2Type, Module} <- emqx_bridge_v2:registered_schema_modules() + Module:fields(action) + || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules() ]. desc(actions) -> @@ -183,7 +183,7 @@ examples(Method) -> ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), lists:foldl(MergeFun, Examples, ConnectorExamples) end, - SchemaModules = [Mod || {_, Mod} <- emqx_bridge_v2:registered_schema_modules()], + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()], lists:foldl(Fun, #{}, SchemaModules). -ifdef(TEST). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 88fa6b7bd..ee468249f 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -12,7 +12,7 @@ brod, brod_gssapi ]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83ecf7b05..3f270d921 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -16,8 +16,6 @@ ]). -import(hoconsc, [mk/2, enum/1, ref/2]). --on_load(register_bridge_v2/0). - -export([ bridge_v2_examples/1, conn_bridge_examples/1, @@ -537,7 +535,9 @@ fields(bridge_v2_field) -> desc => <<"Kafka Producer Bridge V2 Config">>, required => false } - )}. + )}; +fields(action) -> + fields(bridge_v2_field). desc("config_connector") -> ?DESC("desc_config"); @@ -725,17 +725,3 @@ kafka_ext_header_value_validator(Value) -> "placeholder like ${foo}, or a simple string." } end. - --include_lib("emqx_bridge/include/emqx_bridge_v2_register.hrl"). - -register_bridge_v2() -> - emqx_bridge_v2_register_bridge_type(#{ - %% Should be provided by all bridges. Even if the bridge_v2_type_name is - %% the same as the bridge_v1_type_named. - 'bridge_v1_type_name' => kafka, - 'bridge_v2_type_name' => kafka_producer, - 'connector_type' => kafka_producer, - 'schema_module' => ?MODULE, - 'schema_struct_field' => bridge_v2_field - }), - ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl new file mode 100644 index 000000000..82fe73978 --- /dev/null +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -0,0 +1,18 @@ +-module(emqx_bridge_kafka_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> kafka. + +action_type_name() -> kafka_producer. + +connector_type_name() -> kafka_producer. + +schema_module() -> emqx_bridge_kafka. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 78a39f5dd..4e00c1c57 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) -> %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done dump_schema(Dir, SchemaModule) -> + %% TODO: Load all apps instead of only emqx_dashboard + %% as this will help schemas that searches for apps with + %% relevant schema definitions _ = application:load(emqx_dashboard), ok = emqx_dashboard_desc_cache:init(), lists:foreach( From 49fdfef8c36d7aecfd80d2e9768d05f8d0fd6a5b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 10 Nov 2023 07:07:09 +0100 Subject: [PATCH 15/21] fix: azure event hub names are defined in one place --- apps/emqx_bridge/src/emqx_action_info.erl | 44 ++++--------------- .../src/emqx_bridge_azure_event_hub.erl | 9 ++++ ...mqx_bridge_azure_event_hub_action_info.erl | 18 ++++++++ 3 files changed, 35 insertions(+), 36 deletions(-) create mode 100644 apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index b4b6ed88e..0c247ac6f 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -41,7 +41,10 @@ -if(?EMQX_RELEASE_EDITION == ee). hard_coded_action_info_modules_ee() -> - [emqx_bridge_kafka_action_info]. + [ + emqx_bridge_kafka_action_info, + emqx_bridge_azure_event_hub_action_info + ]. -else. hard_coded_action_info_modules_ee() -> []. @@ -63,52 +66,30 @@ action_type_to_connector_type(Type) -> ActionInfoMap = info_map(), ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap), case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of - undefined -> action_type_to_connector_type_old(Type); + undefined -> Type; ConnectorType -> ConnectorType end. -% action_type_to_connector_type_old(kafka) -> -% %% backward compatible -% kafka_producer; -% action_type_to_connector_type_old(kafka_producer) -> -% kafka_producer; -action_type_to_connector_type_old(azure_event_hub_producer) -> - azure_event_hub_producer. - bridge_v1_type_to_action_type(Bin) when is_binary(Bin) -> bridge_v1_type_to_action_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_action_type(Type) -> ActionInfoMap = info_map(), BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap), case maps:get(Type, BridgeV1TypeToActionType, undefined) of - undefined -> bridge_v1_type_to_action_type_old(Type); + undefined -> Type; ActionType -> ActionType end. -% bridge_v1_type_to_action_type_old(kafka) -> -% kafka_producer; -% bridge_v1_type_to_action_type_old(kafka_producer) -> -% kafka_producer; -bridge_v1_type_to_action_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -bridge_v1_type_to_action_type_old(Type) -> - Type. - action_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> action_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); action_type_to_bridge_v1_type(Type) -> ActionInfoMap = info_map(), ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of - undefined -> action_type_to_bridge_v1_type_old(Type); + undefined -> Type; BridgeV1Type -> BridgeV1Type end. -action_type_to_bridge_v1_type_old(azure_event_hub_producer) -> - azure_event_hub_producer; -action_type_to_bridge_v1_type_old(Type) -> - Type. - %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. @@ -118,19 +99,10 @@ is_action_type(Type) -> ActionInfoMap = info_map(), ActionTypes = maps:get(action_type_names, ActionInfoMap), case maps:get(Type, ActionTypes, undefined) of - undefined -> is_action_type_old(Type); + undefined -> false; _ -> true end. -% is_action_type_old(kafka_producer) -> -% true; -% is_action_type_old(kafka) -> -% true; -is_action_type_old(azure_event_hub_producer) -> - true; -is_action_type_old(_) -> - false. - registered_schema_modules() -> InfoMap = info_map(), Schemas = maps:get(action_type_to_schema_module, InfoMap), diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 6ae550a7f..eb364bdff 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -114,6 +114,15 @@ fields(kafka_message) -> Fields0 = emqx_bridge_kafka:fields(kafka_message), Fields = proplists:delete(timestamp, Fields0), override_documentations(Fields); +fields(action) -> + {azure_event_hub_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)), + #{ + desc => <<"Azure Event Hub Actions Config">>, + required => false + } + )}; fields(actions) -> Fields = override( diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl new file mode 100644 index 000000000..f7a09c6d1 --- /dev/null +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl @@ -0,0 +1,18 @@ +-module(emqx_bridge_azure_event_hub_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> azure_event_hub_producer. + +action_type_name() -> azure_event_hub_producer. + +connector_type_name() -> azure_event_hub_producer. + +schema_module() -> emqx_bridge_azure_event_hub. From e93b71d8d50798777dbdc5019f7021b21c6fd214 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 10 Nov 2023 15:43:32 +0100 Subject: [PATCH 16/21] fix: problems found by @thalesmg in code review Co-authored-by: Thales Macedo Garitezi --- apps/emqx_bridge/src/emqx_action_info.erl | 4 +++- apps/emqx_bridge/src/emqx_bridge_v2.erl | 2 +- apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl | 10 ++-------- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 6 +++--- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 0c247ac6f..8e8d51aff 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -1,3 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -146,7 +148,7 @@ action_info_modules() -> lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()). action_info_modules(App) -> - case application:get_env(App, emqx, action_info_module) of + case application:get_env(App, emqx_action_info_module) of {ok, Module} -> [Module]; _ -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 77ef8bef3..70e248e56 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -101,7 +101,7 @@ %% Compatibility Layer API %% All public functions for the compatibility layer should be prefixed with -%% birdge_v1_ +%% bridge_v1_ -export([ bridge_v1_lookup_and_transform/2, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 65250ad40..fdf167db8 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -90,13 +90,10 @@ api_schema(Method) -> hoconsc:union(bridge_api_union(EE ++ APISchemas)). registered_api_schemas(Method) -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2:module_info(), - RegistredSchmeas = emqx_action_info:registered_schema_modules(), + RegisteredSchemas = emqx_action_info:registered_schema_modules(), [ api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") - || {BridgeV2Type, SchemaModule} <- RegistredSchmeas + || {BridgeV2Type, SchemaModule} <- RegisteredSchemas ]. api_ref(Module, Type, Method) -> @@ -148,9 +145,6 @@ roots() -> end. fields(actions) -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2:module_info(), enterprise_fields_actions() ++ registered_schema_fields(). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 3f270d921..0eb015cd3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -527,17 +527,17 @@ fields(resource_opts) -> SupportedFields = [health_check_interval], CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts); -fields(bridge_v2_field) -> +fields(action_field) -> {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), #{ - desc => <<"Kafka Producer Bridge V2 Config">>, + desc => <<"Kafka Producer Action Config">>, required => false } )}; fields(action) -> - fields(bridge_v2_field). + fields(action_field). desc("config_connector") -> ?DESC("desc_config"); From ca3e5eab113ddabd8554d5af362acfdca2985913 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 10 Nov 2023 17:10:08 +0100 Subject: [PATCH 17/21] refactor: remove emqx_bridge_v2_enterprise.erl --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 12 +----- .../src/schema/emqx_bridge_enterprise.erl | 4 +- .../src/schema/emqx_bridge_v2_enterprise.erl | 42 ------------------- .../src/schema/emqx_bridge_v2_schema.erl | 41 +----------------- 4 files changed, 5 insertions(+), 94 deletions(-) delete mode 100644 apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index b28d5ec01..d5fd09631 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -98,21 +98,11 @@ get_response_body_schema() -> ). bridge_info_examples(Method) -> - maps:merge( - emqx_bridge_v2_schema:examples(Method), - emqx_enterprise_bridge_examples(Method) - ). + emqx_bridge_v2_schema:examples(Method). bridge_info_array_example(Method) -> lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))). --if(?EMQX_RELEASE_EDITION == ee). -emqx_enterprise_bridge_examples(Method) -> - emqx_bridge_v2_enterprise:examples(Method). --else. -emqx_enterprise_bridge_examples(_Method) -> #{}. --endif. - param_path_id() -> {id, mk( diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 926cf296c..9456575d4 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -82,9 +82,9 @@ schema_modules() -> ]. examples(Method) -> - EnterpriseExamples = emqx_bridge_v2_enterprise:examples(Method), + ActionExamples = emqx_bridge_v2_schema:examples(Method), RegisteredExamples = registered_examples(Method), - maps:merge(EnterpriseExamples, RegisteredExamples). + maps:merge(ActionExamples, RegisteredExamples). registered_examples(Method) -> MergeFun = diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl deleted file mode 100644 index 29dc71b69..000000000 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ /dev/null @@ -1,42 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_bridge_v2_enterprise). - --if(?EMQX_RELEASE_EDITION == ee). - --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - api_schemas/1, - examples/1, - fields/1 -]). - -examples(Method) -> - MergeFun = - fun(Example, Examples) -> - maps:merge(Examples, Example) - end, - Fun = - fun(Module, Examples) -> - ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), - lists:foldl(MergeFun, Examples, ConnectorExamples) - end, - lists:foldl(Fun, #{}, schema_modules()). - -schema_modules() -> - []. - -fields(actions) -> - action_structs(). - -action_structs() -> - []. - -api_schemas(_Method) -> - []. - --else. - --endif. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index fdf167db8..5d05575a7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -33,46 +33,11 @@ -export([types/0, types_sc/0]). --export([enterprise_api_schemas/1]). - -export_type([action_type/0]). %% Should we explicitly list them here so dialyzer may be more helpful? -type action_type() :: atom(). --if(?EMQX_RELEASE_EDITION == ee). --spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when - Method :: string(). -enterprise_api_schemas(Method) -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2_enterprise:module_info(), - case erlang:function_exported(emqx_bridge_v2_enterprise, api_schemas, 1) of - true -> emqx_bridge_v2_enterprise:api_schemas(Method); - false -> [] - end. - -enterprise_fields_actions() -> - %% We *must* do this to ensure the module is really loaded, especially when we use - %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_bridge_v2_enterprise:module_info(), - case erlang:function_exported(emqx_bridge_v2_enterprise, fields, 1) of - true -> - emqx_bridge_v2_enterprise:fields(actions); - false -> - [] - end. - --else. - --spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when - Method :: string(). -enterprise_api_schemas(_Method) -> []. - -enterprise_fields_actions() -> []. - --endif. - %%====================================================================================== %% For HTTP APIs get_response() -> @@ -85,9 +50,8 @@ post_request() -> api_schema("post"). api_schema(Method) -> - EE = ?MODULE:enterprise_api_schemas(Method), APISchemas = registered_api_schemas(Method), - hoconsc:union(bridge_api_union(EE ++ APISchemas)). + hoconsc:union(bridge_api_union(APISchemas)). registered_api_schemas(Method) -> RegisteredSchemas = emqx_action_info:registered_schema_modules(), @@ -145,8 +109,7 @@ roots() -> end. fields(actions) -> - enterprise_fields_actions() ++ - registered_schema_fields(). + registered_schema_fields(). registered_schema_fields() -> [ From 093c8b0c6ebd87fd51c7f921d948315a64708152 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 10 Nov 2023 17:14:46 +0100 Subject: [PATCH 18/21] docs: add missing copyright headers --- .../src/emqx_bridge_azure_event_hub_action_info.erl | 4 ++++ apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl index f7a09c6d1..8ebdb2435 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub_action_info.erl @@ -1,3 +1,7 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + -module(emqx_bridge_azure_event_hub_action_info). -behaviour(emqx_action_info). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl index 82fe73978..50d4f0c63 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_action_info.erl @@ -1,3 +1,7 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + -module(emqx_bridge_kafka_action_info). -behaviour(emqx_action_info). From 3bea3496af5a3d2c9bff7a2f20da9792202e539f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 10 Nov 2023 19:56:17 +0100 Subject: [PATCH 19/21] test: fix test case mock that broke due to removal --- apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl | 3 ++- .../test/emqx_bridge_v1_compatibility_layer_SUITE.erl | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 5d05575a7..6386e74a1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -28,7 +28,8 @@ get_response/0, put_request/0, post_request/0, - examples/1 + examples/1, + registered_api_schemas/1 ]). -export([types/0, types_sc/0]). diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index 8227e7993..f3b7fb685 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -111,7 +111,7 @@ setup_mocks() -> catch meck:new(emqx_bridge_v2_schema, MeckOpts), meck:expect( emqx_bridge_v2_schema, - enterprise_api_schemas, + registered_api_schemas, 1, fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end ), From d682e6e23c2feaacf72d4646bd91b89ca564f0ca Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 13 Nov 2023 15:21:30 +0100 Subject: [PATCH 20/21] test: fix test mock by calling exported function --- apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 6386e74a1..ede783e97 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -28,7 +28,13 @@ get_response/0, put_request/0, post_request/0, - examples/1, + examples/1 +]). + +%% Exported for mocking +%% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to +%% export this +-export([ registered_api_schemas/1 ]). @@ -51,7 +57,7 @@ post_request() -> api_schema("post"). api_schema(Method) -> - APISchemas = registered_api_schemas(Method), + APISchemas = ?MODULE:registered_api_schemas(Method), hoconsc:union(bridge_api_union(APISchemas)). registered_api_schemas(Method) -> From a49aea3b562579324a902bd2302a9e70219632a7 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Tue, 14 Nov 2023 09:27:04 +0100 Subject: [PATCH 21/21] chore: bump app versions --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src | 2 +- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index c2387fe99..f829b12df 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.29"}, + {vsn, "0.1.30"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index ece0495f9..40ea79334 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index ee468249f..00b9d8968 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index fda3e4759..3856a882c 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.30"}, + {vsn, "0.1.31"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 8092fadc8..9edd03078 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.24"}, + {vsn, "0.1.25"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [