diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 5ce60fe6c..d80050191 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -26,7 +26,10 @@ bridge_v1_type_to_action_type/1, bridge_v1_type_name/1, is_action_type/1, - registered_schema_modules/0, + is_source/1, + is_action/1, + registered_schema_modules_actions/0, + registered_schema_modules_sources/0, connector_action_config_to_bridge_v1_config/2, connector_action_config_to_bridge_v1_config/3, bridge_v1_config_to_connector_config/2, @@ -51,19 +54,26 @@ ConnectorConfig :: map(), ActionConfig :: map() ) -> map(). %% Define this if the automatic config upgrade is not enough for the connector. --callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map(). +-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> + map() | {ConnectorTypeName :: atom(), map()}. %% Define this if the automatic config upgrade is not enough for the bridge. %% If you want to make use of the automatic config upgrade, you can call %% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your %% implementation and do some adjustments on the result. -callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) -> - map(). + map() | {source | action, ActionTypeName :: atom(), map()} | 'none'. +-callback is_source() -> + boolean(). +-callback is_action() -> + boolean(). -optional_callbacks([ bridge_v1_type_name/0, connector_action_config_to_bridge_v1_config/2, bridge_v1_config_to_connector_config/1, - bridge_v1_config_to_action_config/2 + bridge_v1_config_to_action_config/2, + is_source/0, + is_action/0 ]). %% ==================================================================== @@ -96,7 +106,10 @@ hard_coded_action_info_modules_ee() -> -endif. hard_coded_action_info_modules_common() -> - [emqx_bridge_http_action_info]. + [ + emqx_bridge_http_action_info, + emqx_bridge_mqtt_pubsub_action_info + ]. hard_coded_action_info_modules() -> hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). @@ -178,10 +191,33 @@ is_action_type(Type) -> _ -> true end. -registered_schema_modules() -> +%% Returns true if the action is an ingress action, false otherwise. +is_source(Bin) when is_binary(Bin) -> + is_source(binary_to_existing_atom(Bin)); +is_source(Type) -> + ActionInfoMap = info_map(), + IsSourceMap = maps:get(is_source, ActionInfoMap), + maps:get(Type, IsSourceMap, false). + +%% Returns true if the action is an egress action, false otherwise. +is_action(Bin) when is_binary(Bin) -> + is_action(binary_to_existing_atom(Bin)); +is_action(Type) -> + ActionInfoMap = info_map(), + IsActionMap = maps:get(is_action, ActionInfoMap), + maps:get(Type, IsActionMap, true). + +registered_schema_modules_actions() -> InfoMap = info_map(), Schemas = maps:get(action_type_to_schema_module, InfoMap), - maps:to_list(Schemas). + All = maps:to_list(Schemas), + [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_action(Type)]. + +registered_schema_modules_sources() -> + InfoMap = info_map(), + Schemas = maps:get(action_type_to_schema_module, InfoMap), + All = maps:to_list(Schemas), + [{Type, SchemaMod} || {Type, SchemaMod} <- All, is_source(Type)]. connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) -> Module = get_action_info_module(ActionOrBridgeType), @@ -293,7 +329,9 @@ initial_info_map() -> action_type_to_bridge_v1_type => #{}, action_type_to_connector_type => #{}, action_type_to_schema_module => #{}, - action_type_to_info_module => #{} + action_type_to_info_module => #{}, + is_source => #{}, + is_action => #{} }. get_info_map(Module) -> @@ -312,6 +350,20 @@ get_info_map(Module) -> false -> {ActionType, [ActionType]} end, + IsIngress = + case erlang:function_exported(Module, is_source, 0) of + true -> + Module:is_source(); + false -> + false + end, + IsEgress = + case erlang:function_exported(Module, is_action, 0) of + true -> + Module:is_action(); + false -> + true + end, #{ action_type_names => lists:foldl( @@ -351,5 +403,11 @@ get_info_map(Module) -> end, #{ActionType => Module}, BridgeV1Types - ) + ), + is_source => #{ + ActionType => IsIngress + }, + is_action => #{ + ActionType => IsEgress + } }. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d26a44a1d..c7d9a2d27 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -353,7 +353,13 @@ get_metrics(Type, Name) -> case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of true -> BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type), - emqx_bridge_v2:get_metrics(BridgeV2Type, Name); + try + ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one(Type, Name), + emqx_bridge_v2:get_metrics(ConfRootKey, BridgeV2Type, Name) + catch + error:Reason -> + {error, Reason} + end; false -> {error, not_bridge_v1_compatible} end; diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index e1cd03ac2..3168ae590 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -548,9 +548,17 @@ schema("/bridges_probe") -> Id, case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType), - ok = emqx_bridge_v2:reset_metrics(BridgeV2Type, BridgeName), - ?NO_CONTENT; + try + ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one( + BridgeType, BridgeName + ), + BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType), + ok = emqx_bridge_v2:reset_metrics(ConfRootKey, BridgeV2Type, BridgeName), + ?NO_CONTENT + catch + error:Reason -> + ?BAD_REQUEST(Reason) + end; false -> ok = emqx_bridge_resource:reset_metrics( emqx_bridge_resource:resource_id(BridgeType, BridgeName) diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index 9386a38d3..7f74dfb2d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -82,7 +82,8 @@ external_ids(Type, Name) -> get_conf(BridgeType, BridgeName) -> case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of true -> - emqx_conf:get_raw([actions, BridgeType, BridgeName]); + ConfRootName = emqx_bridge_v2:get_conf_root_key_if_only_one(BridgeType, BridgeName), + emqx_conf:get_raw([ConfRootName, BridgeType, BridgeName]); false -> undefined end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 723808919..66d4dc674 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -26,7 +26,8 @@ %% Note: this is strange right now, because it lives in `emqx_bridge_v2', but it shall be %% refactored into a new module/application with appropriate name. --define(ROOT_KEY, actions). +-define(ROOT_KEY_ACTIONS, actions). +-define(ROOT_KEY_SOURCES, sources). %% Loading and unloading config when EMQX starts and stops -export([ @@ -38,7 +39,9 @@ -export([ list/0, + list/1, lookup/2, + lookup/3, create/3, %% The remove/2 function is only for internal use as it may create %% rules with broken dependencies @@ -53,13 +56,16 @@ -export([ disable_enable/3, + disable_enable/4, health_check/2, send_message/4, query/4, start/2, reset_metrics/2, + reset_metrics/3, create_dry_run/2, - get_metrics/2 + get_metrics/2, + get_metrics/3 ]). %% On message publish hook (for local_topics) @@ -122,7 +128,8 @@ bridge_v1_stop/2, bridge_v1_start/2, %% For test cases only - bridge_v1_remove/2 + bridge_v1_remove/2, + get_conf_root_key_if_only_one/2 ]). %%==================================================================== @@ -151,19 +158,22 @@ %%==================================================================== load() -> - load_bridges(), + load_bridges(?ROOT_KEY_ACTIONS), + load_bridges(?ROOT_KEY_SOURCES), load_message_publish_hook(), ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), + ok = emqx_config_handler:add_handler(config_key_path_leaf_sources(), emqx_bridge_v2), + ok = emqx_config_handler:add_handler(config_key_path_sources(), emqx_bridge_v2), ok. -load_bridges() -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), +load_bridges(RootName) -> + Bridges = emqx:get_config([RootName], #{}), lists:foreach( fun({Type, Bridge}) -> lists:foreach( fun({Name, BridgeConf}) -> - install_bridge_v2(Type, Name, BridgeConf) + install_bridge_v2(RootName, Type, Name, BridgeConf) end, maps:to_list(Bridge) ) @@ -172,19 +182,20 @@ load_bridges() -> ). unload() -> - unload_bridges(), + unload_bridges(?ROOT_KEY_ACTIONS), + unload_bridges(?ROOT_KEY_SOURCES), unload_message_publish_hook(), emqx_conf:remove_handler(config_key_path()), emqx_conf:remove_handler(config_key_path_leaf()), ok. -unload_bridges() -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), +unload_bridges(ConfRooKey) -> + Bridges = emqx:get_config([ConfRooKey], #{}), lists:foreach( fun({Type, Bridge}) -> lists:foreach( fun({Name, BridgeConf}) -> - uninstall_bridge_v2(Type, Name, BridgeConf) + uninstall_bridge_v2(ConfRooKey, Type, Name, BridgeConf) end, maps:to_list(Bridge) ) @@ -198,7 +209,12 @@ unload_bridges() -> -spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(Type, Name) -> - case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of + lookup(?ROOT_KEY_ACTIONS, Type, Name). + +-spec lookup(sources | actions, bridge_v2_type(), bridge_v2_name()) -> + {ok, bridge_v2_info()} | {error, not_found}. +lookup(ConfRootName, Type, Name) -> + case emqx:get_raw_config([ConfRootName, Type, Name], not_found) of not_found -> {error, not_found}; #{<<"connector">> := BridgeConnector} = RawConf -> @@ -218,7 +234,7 @@ lookup(Type, Name) -> %% Find the Bridge V2 status from the ConnectorData ConnectorStatus = maps:get(status, ConnectorData, undefined), Channels = maps:get(added_channels, ConnectorData, #{}), - BridgeV2Id = id(Type, Name, BridgeConnector), + BridgeV2Id = id_with_root_name(ConfRootName, Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), {DisplayBridgeV2Status, ErrorMsg} = case {ChannelStatus, ConnectorStatus} of @@ -245,20 +261,30 @@ lookup(Type, Name) -> -spec list() -> [bridge_v2_info()] | {error, term()}. list() -> - list_with_lookup_fun(fun lookup/2). + list_with_lookup_fun(?ROOT_KEY_ACTIONS, fun lookup/2). + +list(ConfRootKey) -> + LookupFun = fun(Type, Name) -> + lookup(ConfRootKey, Type, Name) + end, + list_with_lookup_fun(ConfRootKey, LookupFun). -spec create(bridge_v2_type(), bridge_v2_name(), map()) -> {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> + create(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, RawConf). + +create(ConfRootKey, BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ bridge_action => create, bridge_version => 2, bridge_type => BridgeType, bridge_name => BridgeName, - bridge_raw_config => emqx_utils:redact(RawConf) + bridge_raw_config => emqx_utils:redact(RawConf), + root_key_path => ConfRootKey }), emqx_conf:update( - config_key_path() ++ [BridgeType, BridgeName], + [ConfRootKey, BridgeType, BridgeName], RawConf, #{override_to => cluster} ). @@ -267,6 +293,9 @@ create(BridgeType, BridgeName, RawConf) -> remove(BridgeType, BridgeName) -> %% NOTE: This function can cause broken references from rules but it is only %% called directly from test cases. + remove(?ROOT_KEY_ACTIONS, BridgeType, BridgeName). + +remove(ConfRootKey, BridgeType, BridgeName) -> ?SLOG(debug, #{ bridge_action => remove, bridge_version => 2, @@ -275,7 +304,7 @@ remove(BridgeType, BridgeName) -> }), case emqx_conf:remove( - config_key_path() ++ [BridgeType, BridgeName], + [ConfRootKey, BridgeType, BridgeName], #{override_to => cluster} ) of @@ -307,7 +336,7 @@ check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> %% Helpers for CRUD API %%-------------------------------------------------------------------- -list_with_lookup_fun(LookupFun) -> +list_with_lookup_fun(ConfRootName, LookupFun) -> maps:fold( fun(Type, NameAndConf, Bridges) -> maps:fold( @@ -330,21 +359,24 @@ list_with_lookup_fun(LookupFun) -> ) end, [], - emqx:get_raw_config([?ROOT_KEY], #{}) + emqx:get_raw_config([ConfRootName], #{}) ). install_bridge_v2( + _RootName, _BridgeType, _BridgeName, #{enable := false} ) -> ok; install_bridge_v2( + RootName, BridgeV2Type, BridgeName, Config ) -> install_bridge_v2_helper( + RootName, BridgeV2Type, BridgeName, combine_connector_and_bridge_v2_config( @@ -355,6 +387,7 @@ install_bridge_v2( ). install_bridge_v2_helper( + _RootName, _BridgeV2Type, _BridgeName, {error, Reason} = Error @@ -362,11 +395,12 @@ install_bridge_v2_helper( ?SLOG(warning, Reason), Error; install_bridge_v2_helper( + RootName, BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config ) -> - BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + BridgeV2Id = id_with_root_name(RootName, BridgeV2Type, BridgeName, ConnectorName), CreationOpts = emqx_resource:fetch_creation_opts(Config), %% Create metrics for Bridge V2 ok = emqx_resource:create_metrics(BridgeV2Id), @@ -388,18 +422,45 @@ install_bridge_v2_helper( ConnectorId = emqx_connector_resource:resource_id( connector_type(BridgeV2Type), ConnectorName ), - ConfigWithTypeAndName = Config#{ - bridge_type => bin(BridgeV2Type), - bridge_name => bin(BridgeName) - }, emqx_resource_manager:add_channel( ConnectorId, BridgeV2Id, - ConfigWithTypeAndName + augment_channel_config( + RootName, + BridgeV2Type, + BridgeName, + Config + ) ), ok. +augment_channel_config( + ConfigRoot, + BridgeV2Type, + BridgeName, + Config +) -> + AugmentedConf = Config#{ + config_root => ConfigRoot, + bridge_type => bin(BridgeV2Type), + bridge_name => bin(BridgeName) + }, + case emqx_action_info:is_source(BridgeV2Type) of + true -> + BId = emqx_bridge_resource:bridge_id(BridgeV2Type, BridgeName), + BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId), + SourceHookpoint = source_hookpoint(BId), + HookPoints = [BridgeHookpoint, SourceHookpoint], + AugmentedConf#{hookpoints => HookPoints}; + false -> + AugmentedConf + end. + +source_hookpoint(BridgeId) -> + <<"$sources/", (bin(BridgeId))/binary>>. + uninstall_bridge_v2( + _ConfRootKey, _BridgeType, _BridgeName, #{enable := false} @@ -407,11 +468,12 @@ uninstall_bridge_v2( %% Already not installed ok; uninstall_bridge_v2( + ConfRootKey, BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config ) -> - BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName), CreationOpts = emqx_resource:fetch_creation_opts(Config), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource:clear_metrics(BridgeV2Id), @@ -460,8 +522,11 @@ combine_connector_and_bridge_v2_config( -spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> {ok, any()} | {error, any()}. disable_enable(Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) -> + disable_enable(?ROOT_KEY_ACTIONS, Action, BridgeType, BridgeName). + +disable_enable(ConfRootKey, Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) -> emqx_conf:update( - config_key_path() ++ [BridgeType, BridgeName], + [ConfRootKey, BridgeType, BridgeName], Action, #{override_to => cluster} ). @@ -477,10 +542,12 @@ start(BridgeV2Type, Name) -> ConnectorOpFun = fun(ConnectorType, ConnectorName) -> emqx_connector_resource:start(ConnectorType, ConnectorName) end, - connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true). + ConfRootKey = ?ROOT_KEY_ACTIONS, + connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, true). -connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> +connector_operation_helper(ConfRootKey, BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> connector_operation_helper_with_conf( + ConfRootKey, BridgeV2Type, Name, lookup_conf(BridgeV2Type, Name), @@ -489,14 +556,16 @@ connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) -> ). connector_operation_helper_with_conf( + _ConfRootKey, _BridgeV2Type, _Name, - {error, bridge_not_found} = Error, + {error, _} = Error, _ConnectorOpFun, _DoHealthCheck ) -> Error; connector_operation_helper_with_conf( + _ConfRootKey, _BridgeV2Type, _Name, #{enable := false}, @@ -505,6 +574,7 @@ connector_operation_helper_with_conf( ) -> ok; connector_operation_helper_with_conf( + ConfRootKey, BridgeV2Type, Name, #{connector := ConnectorName}, @@ -519,7 +589,7 @@ connector_operation_helper_with_conf( {true, {error, Reason}} -> {error, Reason}; {true, ok} -> - case health_check(BridgeV2Type, Name) of + case health_check(ConfRootKey, BridgeV2Type, Name) of #{status := connected} -> ok; {error, Reason} -> @@ -536,14 +606,17 @@ connector_operation_helper_with_conf( -spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}. reset_metrics(Type, Name) -> - reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). + reset_metrics(?ROOT_KEY_ACTIONS, Type, Name). -reset_metrics_helper(_Type, _Name, #{enable := false}) -> +reset_metrics(ConfRootKey, Type, Name) -> + reset_metrics_helper(ConfRootKey, Type, Name, lookup_conf(ConfRootKey, Type, Name)). + +reset_metrics_helper(_ConfRootKey, _Type, _Name, #{enable := false}) -> ok; -reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> - BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), +reset_metrics_helper(ConfRootKey, BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> + BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id); -reset_metrics_helper(_, _, _) -> +reset_metrics_helper(_, _, _, _) -> {error, not_found}. get_query_mode(BridgeV2Type, Config) -> @@ -599,7 +672,10 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) -> -spec health_check(BridgeType :: term(), BridgeName :: term()) -> #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. health_check(BridgeType, BridgeName) -> - case lookup_conf(BridgeType, BridgeName) of + health_check(?ROOT_KEY_ACTIONS, BridgeType, BridgeName). + +health_check(ConfRootKey, BridgeType, BridgeName) -> + case lookup_conf(ConfRootKey, BridgeType, BridgeName) of #{ enable := true, connector := ConnectorName @@ -608,7 +684,7 @@ health_check(BridgeType, BridgeName) -> connector_type(BridgeType), ConnectorName ), emqx_resource_manager:channel_health_check( - ConnectorId, id(BridgeType, BridgeName, ConnectorName) + ConnectorId, id_with_root_name(ConfRootKey, BridgeType, BridgeName, ConnectorName) ); #{enable := false} -> {error, bridge_stopped}; @@ -652,13 +728,13 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), ChannelTestId = id(BridgeType, BridgeName, ConnectorName), Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), - ConfWithTypeAndName = Conf#{ - bridge_type => bin(BridgeType), - bridge_name => bin(BridgeName) - }, - case - emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, ConfWithTypeAndName) - of + AugmentedConf = augment_channel_config( + ?ROOT_KEY_ACTIONS, + BridgeType, + BridgeName, + Conf + ), + case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of {error, Reason} -> {error, Reason}; ok -> @@ -677,7 +753,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> -spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics(). get_metrics(Type, Name) -> - emqx_resource:get_metrics(id(Type, Name)). + get_metrics(?ROOT_KEY_ACTIONS, Type, Name). + +get_metrics(ConfRootKey, Type, Name) -> + emqx_resource:get_metrics(id_with_root_name(ConfRootKey, Type, Name)). %%==================================================================== %% On message publish hook (for local topics) @@ -690,7 +769,7 @@ reload_message_publish_hook(Bridges) -> ok = load_message_publish_hook(Bridges). load_message_publish_hook() -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), + Bridges = emqx:get_config([?ROOT_KEY_ACTIONS], #{}), load_message_publish_hook(Bridges). load_message_publish_hook(Bridges) -> @@ -754,7 +833,7 @@ send_to_matched_egress_bridges(Topic, Msg) -> ). get_matched_egress_bridges(Topic) -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), + Bridges = emqx:get_config([?ROOT_KEY_ACTIONS], #{}), maps:fold( fun(BType, Conf, Acc0) -> maps:fold( @@ -800,16 +879,21 @@ parse_id(Id) -> end. get_channels_for_connector(ConnectorId) -> + Actions = get_channels_for_connector(?ROOT_KEY_ACTIONS, ConnectorId), + Sources = get_channels_for_connector(?ROOT_KEY_SOURCES, ConnectorId), + Actions ++ Sources. + +get_channels_for_connector(SourcesOrActions, ConnectorId) -> try emqx_connector_resource:parse_connector_id(ConnectorId) of {ConnectorType, ConnectorName} -> - RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), + RootConf = maps:keys(emqx:get_config([SourcesOrActions], #{})), RelevantBridgeV2Types = [ Type || Type <- RootConf, connector_type(Type) =:= ConnectorType ], lists:flatten([ - get_channels_for_connector(ConnectorName, BridgeV2Type) + get_channels_for_connector(SourcesOrActions, ConnectorName, BridgeV2Type) || BridgeV2Type <- RelevantBridgeV2Types ]) catch @@ -819,33 +903,55 @@ get_channels_for_connector(ConnectorId) -> [] end. -get_channels_for_connector(ConnectorName, BridgeV2Type) -> - BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), +get_channels_for_connector(SourcesOrActions, ConnectorName, BridgeV2Type) -> + BridgeV2s = emqx:get_config([SourcesOrActions, BridgeV2Type], #{}), [ - {id(BridgeV2Type, Name, ConnectorName), Conf#{ - bridge_name => bin(Name), - bridge_type => bin(BridgeV2Type) - }} + { + id_with_root_name(SourcesOrActions, BridgeV2Type, Name, ConnectorName), + augment_channel_config(SourcesOrActions, BridgeV2Type, Name, Conf) + } || {Name, Conf} <- maps:to_list(BridgeV2s), bin(ConnectorName) =:= maps:get(connector, Conf, no_name) ]. %%==================================================================== -%% Exported for tests +%% ID related functions %%==================================================================== id(BridgeType, BridgeName) -> - case lookup_conf(BridgeType, BridgeName) of - #{connector := ConnectorName} -> - id(BridgeType, BridgeName, ConnectorName); - {error, Reason} -> - throw(Reason) - end. + id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName). id(BridgeType, BridgeName, ConnectorName) -> + id_with_root_name(?ROOT_KEY_ACTIONS, BridgeType, BridgeName, ConnectorName). + +id_with_root_name(RootName, BridgeType, BridgeName) -> + case lookup_conf(RootName, BridgeType, BridgeName) of + #{connector := ConnectorName} -> + id_with_root_name(RootName, BridgeType, BridgeName, ConnectorName); + {error, Reason} -> + throw( + {action_source_not_found, #{ + reason => Reason, + root_name => RootName, + type => BridgeType, + name => BridgeName + }} + ) + end. + +id_with_root_name(RootName, BridgeType, BridgeName, ConnectorName) -> ConnectorType = bin(connector_type(BridgeType)), - <<"action:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", - (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. + << + (bin(RootName))/binary, + ":", + (bin(BridgeType))/binary, + ":", + (bin(BridgeName))/binary, + ":connector:", + (bin(ConnectorType))/binary, + ":", + (bin(ConnectorName))/binary + >>. connector_type(Type) -> %% remote call so it can be mocked @@ -860,76 +966,65 @@ bridge_v2_type_to_connector_type(Type) -> import_config(RawConf) -> %% actions structure - emqx_bridge:import_config(RawConf, <<"actions">>, ?ROOT_KEY, config_key_path()). + emqx_bridge:import_config(RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()). %%==================================================================== %% Config Update Handler API %%==================================================================== -config_key_path() -> [?ROOT_KEY]. +config_key_path() -> + [?ROOT_KEY_ACTIONS]. -config_key_path_leaf() -> [?ROOT_KEY, '?', '?']. +config_key_path_leaf() -> + [?ROOT_KEY_ACTIONS, '?', '?']. + +config_key_path_sources() -> + [?ROOT_KEY_SOURCES]. + +config_key_path_leaf_sources() -> + [?ROOT_KEY_SOURCES, '?', '?']. %% enable or disable action -pre_config_update([?ROOT_KEY, _Type, _Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) -> +pre_config_update([ConfRootKey, _Type, _Name], Oper, undefined) when + ?ENABLE_OR_DISABLE(Oper) andalso + (ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES) +-> {error, bridge_not_found}; -pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) -> +pre_config_update([ConfRootKey, _Type, _Name], Oper, OldAction) when + ?ENABLE_OR_DISABLE(Oper) andalso + (ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES) +-> {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}}; %% Updates a single action from a specific HTTP API. %% If the connector is not found, the update operation fails. -pre_config_update([?ROOT_KEY, Type, Name], Conf = #{}, _OldConf) -> - action_convert_from_connector(Type, Name, Conf); +pre_config_update([ConfRootKey, Type, Name], Conf = #{}, _OldConf) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES +-> + convert_from_connector(ConfRootKey, Type, Name, Conf); %% Batch updates actions when importing a configuration or executing a CLI command. %% Update succeeded even if the connector is not found, alarm in post_config_update -pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) -> - {ok, actions_convert_from_connectors(Conf)}. +pre_config_update([ConfRootKey], Conf = #{}, _OldConfs) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS orelse ConfRootKey =:= ?ROOT_KEY_SOURCES +-> + {ok, convert_from_connectors(ConfRootKey, Conf)}. -%% Don't crash event the bridge is not found -post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> - AllBridges = emqx:get_config([?ROOT_KEY]), - case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of - undefined -> - ok; - Action -> - ok = uninstall_bridge_v2(Type, Name, Action), - Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges), - reload_message_publish_hook(Bridges) - end, - ?tp(bridge_post_config_update_done, #{}), - ok; -%% Create a single bridge failed if the connector is not found(already check in pre_config_update) -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; -%% update bridges failed if the connector is not found(already check in pre_config_update) -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> - ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; %% This top level handler will be triggered when the actions path is updated %% with calls to emqx_conf:update([actions], BridgesConf, #{}). -%% such as import_config/1 -%% Notice ** do succeeded even if the connector is not found ** -%% Install a non-exist connector will alarm & log(warn) in install_bridge_v2. -post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> +post_config_update([ConfRootKey], _Req, NewConf, OldConf, _AppEnv) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES +-> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. - RemoveFun = fun uninstall_bridge_v2/3, - CreateFun = fun install_bridge_v2/3, + RemoveFun = fun(Type, Name, Conf) -> + uninstall_bridge_v2(ConfRootKey, Type, Name, Conf) + end, + CreateFun = fun(Type, Name, Conf) -> + install_bridge_v2(ConfRootKey, Type, Name, Conf) + end, UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> - uninstall_bridge_v2(Type, Name, OldBridgeConf), - install_bridge_v2(Type, Name, Conf) + uninstall_bridge_v2(ConfRootKey, Type, Name, OldBridgeConf), + install_bridge_v2(ConfRootKey, Type, Name, Conf) end, Result = perform_bridge_changes([ #{action => RemoveFun, data => Removed}, @@ -942,7 +1037,45 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> ]), reload_message_publish_hook(NewConf), ?tp(bridge_post_config_update_done, #{}), - Result. + Result; +%% Don't crash even when the bridge is not found +post_config_update([ConfRootKey, Type, Name], '$remove', _, _OldConf, _AppEnvs) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES +-> + AllBridges = emqx:get_config([ConfRootKey]), + case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of + undefined -> + ok; + Action -> + ok = uninstall_bridge_v2(ConfRootKey, Type, Name, Action), + Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges), + reload_message_publish_hook(Bridges) + end, + ?tp(bridge_post_config_update_done, #{}), + ok; +%% Create a single bridge fails if the connector is not found (already checked in pre_config_update) +post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES +-> + ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +%% update bridges fails if the connector is not found (already checked in pre_config_update) +post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when + ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES +-> + ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok. diff_confs(NewConfs, OldConfs) -> emqx_utils_maps:diff_maps( @@ -1051,12 +1184,33 @@ is_bridge_v2_type(Type) -> emqx_action_info:is_action_type(Type). bridge_v1_list_and_transform() -> - Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2), - [B || B <- Bridges, B =/= not_bridge_v1_compatible_error()]. + BridgesFromActions0 = list_with_lookup_fun( + ?ROOT_KEY_ACTIONS, + fun bridge_v1_lookup_and_transform/2 + ), + BridgesFromActions1 = [ + B + || B <- BridgesFromActions0, + B =/= not_bridge_v1_compatible_error() + ], + FromActionsNames = maps:from_keys([Name || #{name := Name} <- BridgesFromActions1], true), + BridgesFromSources0 = list_with_lookup_fun( + ?ROOT_KEY_SOURCES, + fun bridge_v1_lookup_and_transform/2 + ), + BridgesFromSources1 = [ + B + || #{name := SourceBridgeName} = B <- BridgesFromSources0, + B =/= not_bridge_v1_compatible_error(), + %% Action is only shown in case of name conflict + not maps:is_key(SourceBridgeName, FromActionsNames) + ], + BridgesFromActions1 ++ BridgesFromSources1. bridge_v1_lookup_and_transform(ActionType, Name) -> - case lookup(ActionType, Name) of - {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> + case lookup_actions_or_sources(ActionType, Name) of + {ok, ConfRootName, + #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} -> BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig), HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType), case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of @@ -1065,6 +1219,7 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, Connector} -> bridge_v1_lookup_and_transform_helper( + ConfRootName, BridgeV1Type, Name, ActionType, @@ -1082,6 +1237,19 @@ bridge_v1_lookup_and_transform(ActionType, Name) -> Error end. +lookup_actions_or_sources(ActionType, Name) -> + case lookup(?ROOT_KEY_ACTIONS, ActionType, Name) of + {error, not_found} -> + case lookup(?ROOT_KEY_SOURCES, ActionType, Name) of + {ok, SourceInfo} -> + {ok, ?ROOT_KEY_SOURCES, SourceInfo}; + Error -> + Error + end; + {ok, ActionInfo} -> + {ok, ?ROOT_KEY_ACTIONS, ActionInfo} + end. + not_bridge_v1_compatible_error() -> {error, not_bridge_v1_compatible}. @@ -1094,18 +1262,18 @@ has_bridge_v1_equivalent(ActionType) -> connector_raw_config(Connector, ConnectorType) -> get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema). -action_raw_config(Action, ActionType) -> - get_raw_with_defaults(Action, ActionType, <<"actions">>, emqx_bridge_v2_schema). +action_raw_config(ConfRootName, Action, ActionType) -> + get_raw_with_defaults(Action, ActionType, bin(ConfRootName), emqx_bridge_v2_schema). get_raw_with_defaults(Config, Type, TopLevelConf, SchemaModule) -> RawConfig = maps:get(raw_config, Config), fill_defaults(Type, RawConfig, TopLevelConf, SchemaModule). bridge_v1_lookup_and_transform_helper( - BridgeV1Type, BridgeName, ActionType, Action, ConnectorType, Connector + ConfRootName, BridgeV1Type, BridgeName, ActionType, Action, ConnectorType, Connector ) -> ConnectorRawConfig = connector_raw_config(Connector, ConnectorType), - ActionRawConfig = action_raw_config(Action, ActionType), + ActionRawConfig = action_raw_config(ConfRootName, Action, ActionType), BridgeV1Config = emqx_action_info:connector_action_config_to_bridge_v1_config( BridgeV1Type, ConnectorRawConfig, ActionRawConfig ), @@ -1136,7 +1304,52 @@ bridge_v1_lookup_and_transform_helper( end. lookup_conf(Type, Name) -> - case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of + lookup_conf(?ROOT_KEY_ACTIONS, Type, Name). + +lookup_conf_if_one_of_sources_actions(Type, Name) -> + LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, Type, Name), + LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, Type, Name), + case {LookUpConfActions, LookUpConfSources} of + {{error, bridge_not_found}, {error, bridge_not_found}} -> + {error, bridge_not_found}; + {{error, bridge_not_found}, Conf} -> + Conf; + {Conf, {error, bridge_not_found}} -> + Conf; + {_Conf1, _Conf2} -> + {error, name_conflict_sources_actions} + end. + +is_only_source(BridgeType, BridgeName) -> + LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, BridgeType, BridgeName), + LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, BridgeType, BridgeName), + case {LookUpConfActions, LookUpConfSources} of + {{error, bridge_not_found}, {error, bridge_not_found}} -> + false; + {{error, bridge_not_found}, _Conf} -> + true; + {_Conf, {error, bridge_not_found}} -> + false; + {_Conf1, _Conf2} -> + false + end. + +get_conf_root_key_if_only_one(BridgeType, BridgeName) -> + LookUpConfActions = lookup_conf(?ROOT_KEY_ACTIONS, BridgeType, BridgeName), + LookUpConfSources = lookup_conf(?ROOT_KEY_SOURCES, BridgeType, BridgeName), + case {LookUpConfActions, LookUpConfSources} of + {{error, bridge_not_found}, {error, bridge_not_found}} -> + error({action_or_soruces_not_found, BridgeType, BridgeName}); + {{error, bridge_not_found}, _Conf} -> + ?ROOT_KEY_SOURCES; + {_Conf, {error, bridge_not_found}} -> + ?ROOT_KEY_ACTIONS; + {_Conf1, _Conf2} -> + error({name_clash_action_soruces, BridgeType, BridgeName}) + end. + +lookup_conf(RootName, Type, Name) -> + case emqx:get_config([RootName, Type, Name], not_found) of not_found -> {error, bridge_not_found}; Config -> @@ -1146,7 +1359,7 @@ lookup_conf(Type, Name) -> bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), %% Check if the bridge v2 exists - case lookup_conf(BridgeV2Type, BridgeName) of + case lookup_conf_if_one_of_sources_actions(BridgeV2Type, BridgeName) of {error, _} -> %% If the bridge v2 does not exist, it is a valid bridge v1 PreviousRawConf = undefined, @@ -1158,8 +1371,9 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> true -> %% Using remove + create as update, hence do not delete deps. RemoveDeps = [], + ConfRootKey = get_conf_root_key_if_only_one(BridgeV2Type, BridgeName), PreviousRawConf = emqx:get_raw_config( - [?ROOT_KEY, BridgeV2Type, BridgeName], undefined + [ConfRootKey, BridgeV2Type, BridgeName], undefined ), %% To avoid losing configurations. We have to make sure that no crash occurs %% during deletion and creation of configurations. @@ -1185,17 +1399,18 @@ split_bridge_v1_config_and_create_helper( connector_conf := NewConnectorRawConf, bridge_v2_type := BridgeType, bridge_v2_name := BridgeName, - bridge_v2_conf := NewBridgeV2RawConf + bridge_v2_conf := NewBridgeV2RawConf, + conf_root_key := ConfRootName } = split_and_validate_bridge_v1_config( BridgeV1Type, BridgeName, RawConf, PreviousRawConf ), - _ = PreCreateFun(), do_connector_and_bridge_create( + ConfRootName, ConnectorType, NewConnectorName, NewConnectorRawConf, @@ -1210,6 +1425,7 @@ split_bridge_v1_config_and_create_helper( end. do_connector_and_bridge_create( + ConfRootName, ConnectorType, NewConnectorName, NewConnectorRawConf, @@ -1220,7 +1436,7 @@ do_connector_and_bridge_create( ) -> case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of {ok, _} -> - case create(BridgeType, BridgeName, NewBridgeV2RawConf) of + case create(ConfRootName, BridgeType, BridgeName, NewBridgeV2RawConf) of {ok, _} = Result -> Result; {error, Reason1} -> @@ -1257,10 +1473,15 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR } } }, + ConfRootKeyPrevRawConf = + case PreviousRawConf =/= undefined of + true -> get_conf_root_key_if_only_one(BridgeV2Type, BridgeName); + false -> not_used + end, FakeGlobalConfig = emqx_utils_maps:put_if( FakeGlobalConfig0, - bin(?ROOT_KEY), + bin(ConfRootKeyPrevRawConf), #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}}, PreviousRawConf =/= undefined ), @@ -1269,10 +1490,11 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2( FakeGlobalConfig ), + ConfRootKey = get_conf_root_key(Output), NewBridgeV2RawConf = emqx_utils_maps:deep_get( [ - bin(?ROOT_KEY), + ConfRootKey, bin(BridgeV2Type), bin(BridgeName) ], @@ -1280,7 +1502,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR ), ConnectorName = emqx_utils_maps:deep_get( [ - bin(?ROOT_KEY), + ConfRootKey, bin(BridgeV2Type), bin(BridgeName), <<"connector">> @@ -1303,7 +1525,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR bin(ConnectorName) => NewConnectorRawConf } }, - <<"actions">> => #{ + ConfRootKey => #{ bin(BridgeV2Type) => #{ bin(BridgeName) => NewBridgeV2RawConf } @@ -1323,7 +1545,8 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR connector_conf => NewConnectorRawConf, bridge_v2_type => BridgeV2Type, bridge_v2_name => BridgeName, - bridge_v2_conf => NewBridgeV2RawConf + bridge_v2_conf => NewBridgeV2RawConf, + conf_root_key => ConfRootKey } catch %% validation errors @@ -1331,6 +1554,13 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR {error, Reason1} end. +get_conf_root_key(#{<<"actions">> := _}) -> + <<"actions">>; +get_conf_root_key(#{<<"sources">> := _}) -> + <<"sources">>; +get_conf_root_key(_NoMatch) -> + error({incompatible_bridge_v1, no_action_or_source}). + bridge_v1_create_dry_run(BridgeType, RawConfig0) -> RawConf = maps:without([<<"name">>], RawConfig0), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), @@ -1356,7 +1586,7 @@ bridge_v1_remove(BridgeV1Type, BridgeName) -> bridge_v1_remove( ActionType, BridgeName, - lookup_conf(ActionType, BridgeName) + lookup_conf_if_one_of_sources_actions(ActionType, BridgeName) ). bridge_v1_remove( @@ -1364,7 +1594,8 @@ bridge_v1_remove( Name, #{connector := ConnectorName} ) -> - case remove(ActionType, Name) of + ConfRootKey = get_conf_root_key_if_only_one(ActionType, Name), + case remove(ConfRootKey, ActionType, Name) of ok -> ConnectorType = connector_type(ActionType), emqx_connector:remove(ConnectorType, ConnectorName); @@ -1384,7 +1615,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> BridgeV2Type, BridgeName, RemoveDeps, - lookup_conf(BridgeV2Type, BridgeName) + lookup_conf_if_one_of_sources_actions(BridgeV2Type, BridgeName) ). %% Bridge v1 delegated-removal in 3 steps: @@ -1398,9 +1629,10 @@ bridge_v1_check_deps_and_remove( #{connector := ConnectorName} ) -> RemoveConnector = lists:member(connector, RemoveDeps), - case emqx_bridge_lib:maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) of + case maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) of ok -> - case remove(BridgeType, BridgeName) of + ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName), + case remove(ConfRootKey, BridgeType, BridgeName) of ok when RemoveConnector -> maybe_delete_channels(BridgeType, BridgeName, ConnectorName); ok -> @@ -1415,6 +1647,14 @@ bridge_v1_check_deps_and_remove(_BridgeType, _BridgeName, _RemoveDeps, Error) -> %% TODO: the connector is gone, for whatever reason, maybe call remove/2 anyway? Error. +maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) -> + case is_only_source(BridgeType, BridgeName) of + true -> + ok; + false -> + emqx_bridge_lib:maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) + end. + maybe_delete_channels(BridgeType, BridgeName, ConnectorName) -> case connector_has_channels(BridgeType, ConnectorName) of true -> @@ -1467,23 +1707,25 @@ bridge_v1_enable_disable(Action, BridgeType, BridgeName) -> Action, BridgeType, BridgeName, - lookup_conf(BridgeType, BridgeName) + lookup_conf_if_one_of_sources_actions(BridgeType, BridgeName) ); false -> {error, not_bridge_v1_compatible} end. -bridge_v1_enable_disable_helper(_Op, _BridgeType, _BridgeName, {error, bridge_not_found}) -> - {error, bridge_not_found}; +bridge_v1_enable_disable_helper(_Op, _BridgeType, _BridgeName, {error, Reason}) -> + {error, Reason}; bridge_v1_enable_disable_helper(enable, BridgeType, BridgeName, #{connector := ConnectorName}) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeType), ConnectorType = connector_type(BridgeV2Type), {ok, _} = emqx_connector:disable_enable(enable, ConnectorType, ConnectorName), - emqx_bridge_v2:disable_enable(enable, BridgeV2Type, BridgeName); + ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName), + emqx_bridge_v2:disable_enable(ConfRootKey, enable, BridgeV2Type, BridgeName); bridge_v1_enable_disable_helper(disable, BridgeType, BridgeName, #{connector := ConnectorName}) -> BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType), ConnectorType = connector_type(BridgeV2Type), - {ok, _} = emqx_bridge_v2:disable_enable(disable, BridgeV2Type, BridgeName), + ConfRootKey = get_conf_root_key_if_only_one(BridgeType, BridgeName), + {ok, _} = emqx_bridge_v2:disable_enable(ConfRootKey, disable, BridgeV2Type, BridgeName), emqx_connector:disable_enable(disable, ConnectorType, ConnectorName). bridge_v1_restart(BridgeV1Type, Name) -> @@ -1508,10 +1750,12 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of true -> + ConfRootKey = get_conf_root_key_if_only_one(BridgeV2Type, Name), connector_operation_helper_with_conf( + ConfRootKey, BridgeV2Type, Name, - lookup_conf(BridgeV2Type, Name), + lookup_conf_if_one_of_sources_actions(BridgeV2Type, Name), ConnectorOpFun, DoHealthCheck ); @@ -1559,12 +1803,12 @@ referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) -> ok end. -actions_convert_from_connectors(Conf) -> +convert_from_connectors(ConfRootKey, Conf) -> maps:map( fun(ActionType, Actions) -> maps:map( fun(ActionName, Action) -> - case action_convert_from_connector(ActionType, ActionName, Action) of + case convert_from_connector(ConfRootKey, ActionType, ActionName, Action) of {ok, NewAction} -> NewAction; {error, _} -> Action end @@ -1575,7 +1819,7 @@ actions_convert_from_connectors(Conf) -> Conf ). -action_convert_from_connector(Type, Name, Action = #{<<"connector">> := ConnectorName}) -> +convert_from_connector(ConfRootKey, Type, Name, Action = #{<<"connector">> := ConnectorName}) -> case get_connector_info(ConnectorName, Type) of {ok, Connector} -> Action1 = emqx_action_info:action_convert_from_connector(Type, Connector, Action), @@ -1585,7 +1829,8 @@ action_convert_from_connector(Type, Name, Action = #{<<"connector">> := Connecto bridge_name => Name, reason => <<"connector_not_found_or_wrong_type">>, bridge_type => Type, - connector_name => ConnectorName + connector_name => ConnectorName, + conf_root_key => ConfRootKey }} end. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index e144f332d..28017f814 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -79,7 +79,7 @@ api_schema(Method) -> hoconsc:union(bridge_api_union(APISchemas)). registered_api_schemas(Method) -> - RegisteredSchemas = emqx_action_info:registered_schema_modules(), + RegisteredSchemas = emqx_action_info:registered_schema_modules_actions(), [ api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2") || {BridgeV2Type, SchemaModule} <- RegisteredSchemas @@ -189,29 +189,43 @@ tags() -> -dialyzer({nowarn_function, roots/0}). roots() -> - case fields(actions) of - [] -> - [ - {actions, - ?HOCON(hoconsc:map(name, typerefl:map()), #{importance => ?IMPORTANCE_LOW})} - ]; - _ -> - [{actions, ?HOCON(?R_REF(actions), #{importance => ?IMPORTANCE_LOW})}] - end. + ActionsRoot = + case fields(actions) of + [] -> + [ + {actions, + ?HOCON(hoconsc:map(name, typerefl:map()), #{importance => ?IMPORTANCE_LOW})} + ]; + _ -> + [{actions, ?HOCON(?R_REF(actions), #{importance => ?IMPORTANCE_LOW})}] + end, + SourcesRoot = + [{sources, ?HOCON(?R_REF(sources), #{importance => ?IMPORTANCE_LOW})}], + ActionsRoot ++ SourcesRoot. fields(actions) -> - registered_schema_fields(); + registered_schema_fields_actions(); +fields(sources) -> + registered_schema_fields_sources(); fields(resource_opts) -> resource_opts_fields(_Overrides = []). -registered_schema_fields() -> +registered_schema_fields_actions() -> [ Module:fields(action) - || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules() + || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules_actions() + ]. + +registered_schema_fields_sources() -> + [ + Module:fields(source) + || {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules_sources() ]. desc(actions) -> ?DESC("desc_bridges_v2"); +desc(sources) -> + ?DESC("desc_sources"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> @@ -264,7 +278,7 @@ examples(Method) -> ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]), lists:foldl(MergeFun, Examples, ConnectorExamples) end, - SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()], + SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules_actions()], lists:foldl(Fun, #{}, SchemaModules). top_level_common_action_keys() -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index cc2296d3c..1bee2c92e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -20,8 +20,13 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). -export([on_message_received/3]). +-export([handle_disconnect/1]). %% callbacks of behaviour emqx_resource -export([ @@ -30,11 +35,25 @@ on_stop/2, on_query/3, on_query_async/4, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1 ]). -export([on_async_result/2]). +-type name() :: term(). + +-type option() :: + {name, name()} + | {ingress, map()} + %% see `emqtt:option()` + | {client_opts, map()}. + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -define(HEALTH_CHECK_TIMEOUT, 1000). -define(INGRESS, "I"). -define(EGRESS, "E"). @@ -42,142 +61,205 @@ %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, HookPoint, ResId) -> + +on_message_received(Msg, HookPoints, ResId) -> emqx_resource_metrics:received_inc(ResId), - emqx_hooks:run(HookPoint, [Msg]). + lists:foreach( + fun(HookPoint) -> + emqx_hooks:run(HookPoint, [Msg]) + end, + HookPoints + ), + ok. %% =================================================================== callback_mode() -> async_if_possible. -on_start(ResourceId, Conf) -> +on_start(ResourceId, #{server := Server} = Conf) -> ?SLOG(info, #{ msg => "starting_mqtt_connector", connector => ResourceId, config => emqx_utils:redact(Conf) }), - case start_ingress(ResourceId, Conf) of + TopicToHandlerIndex = emqx_topic_index:new(), + StartConf = Conf#{topic_to_handler_index => TopicToHandlerIndex}, + case start_mqtt_clients(ResourceId, StartConf) of {ok, Result1} -> - case start_egress(ResourceId, Conf) of - {ok, Result2} -> - {ok, maps:merge(Result1, Result2)}; - {error, Reason} -> - _ = stop_ingress(Result1), - {error, Reason} - end; + {ok, Result1#{ + installed_channels => #{}, + clean_start => maps:get(clean_start, Conf), + topic_to_handler_index => TopicToHandlerIndex, + server => Server + }}; {error, Reason} -> {error, Reason} end. -start_ingress(ResourceId, Conf) -> - ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf), - case mk_ingress_config(ResourceId, Conf) of - Ingress = #{} -> - start_ingress(ResourceId, Ingress, ClientOpts); - undefined -> - {ok, #{}} - end. - -start_ingress(ResourceId, Ingress, ClientOpts) -> - PoolName = <>, - PoolSize = choose_ingress_pool_size(ResourceId, Ingress), - Options = [ - {name, PoolName}, - {pool_size, PoolSize}, - {ingress, Ingress}, - {client_opts, ClientOpts} - ], - ok = emqx_resource:allocate_resource(ResourceId, ingress_pool_name, PoolName), - case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of - ok -> - {ok, #{ingress_pool_name => PoolName}}; - {error, {start_pool_failed, _, Reason}} -> - {error, Reason} - end. - -choose_ingress_pool_size(<>, _) -> - 1; -choose_ingress_pool_size( - ResourceId, - #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + clean_start := CleanStart + } = OldState, + ChannelId, + #{config_root := actions} = ChannelConfig ) -> - case emqx_topic:parse(RemoteTopic) of - {#share{} = _Filter, _SubOpts} -> - % NOTE: this is shared subscription, many workers may subscribe - PoolSize; - {_Filter, #{}} when PoolSize > 1 -> - % NOTE: this is regular subscription, only one worker should subscribe + %% Publisher channel + %% make a warning if clean_start is set to false + case CleanStart of + false -> + ?tp( + mqtt_clean_start_egress_action_warning, + #{ + channel_id => ChannelId, + resource_id => _InstId + } + ), ?SLOG(warning, #{ - msg => "mqtt_bridge_ingress_pool_size_ignored", - connector => ResourceId, - reason => - "Remote topic filter is not a shared subscription, " - "ingress pool will start with a single worker", - config_pool_size => PoolSize, - pool_size => 1 - }), - 1; - {_Filter, #{}} when PoolSize == 1 -> - 1 - end. + msg => "mqtt_publisher_clean_start_false", + reason => "clean_start is set to false when using MQTT publisher action, " ++ + "which may cause unexpected behavior. " ++ + "For example, if the client ID is already subscribed to topics, " ++ + "we might receive messages that are unhanded.", + channel => ChannelId, + config => emqx_utils:redact(ChannelConfig) + }); + true -> + ok + end, + ChannelState0 = maps:get(parameters, ChannelConfig), + ChannelState = emqx_bridge_mqtt_egress:config(ChannelState0), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}; +on_add_channel( + _ResourceId, + #{ + installed_channels := InstalledChannels, + pool_name := PoolName, + topic_to_handler_index := TopicToHandlerIndex, + server := Server + } = OldState, + ChannelId, + #{hookpoints := HookPoints} = ChannelConfig +) -> + %% Add ingress channel + ChannelState0 = maps:get(parameters, ChannelConfig), + ChannelState1 = ChannelState0#{ + hookpoints => HookPoints, + server => Server, + config_root => sources + }, + ChannelState2 = mk_ingress_config(ChannelId, ChannelState1, TopicToHandlerIndex), + ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState2), + NewInstalledChannels = maps:put(ChannelId, ChannelState2, InstalledChannels), + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. -start_egress(ResourceId, Conf) -> - % NOTE - % We are ignoring the user configuration here because there's currently no reliable way - % to ensure proper session recovery according to the MQTT spec. - ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)), - case mk_egress_config(Conf) of - Egress = #{} -> - start_egress(ResourceId, Egress, ClientOpts); - undefined -> - {ok, #{}} - end. +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + pool_name := PoolName, + topic_to_handler_index := TopicToHandlerIndex + } = OldState, + ChannelId +) -> + ChannelState = maps:get(ChannelId, InstalledChannels), + case ChannelState of + #{ + config_root := sources + } -> + emqx_bridge_mqtt_ingress:unsubscribe_channel( + PoolName, ChannelState, ChannelId, TopicToHandlerIndex + ), + ok; + _ -> + ok + end, + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. -start_egress(ResourceId, Egress, ClientOpts) -> - PoolName = <>, - PoolSize = maps:get(pool_size, Egress), +on_get_channel_status( + _ResId, + ChannelId, + #{ + installed_channels := Channels + } = _State +) when is_map_key(ChannelId, Channels) -> + %% The channel should be ok as long as the MQTT client is ok + connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +start_mqtt_clients(ResourceId, Conf) -> + ClientOpts = mk_client_opts(ResourceId, Conf), + start_mqtt_clients(ResourceId, Conf, ClientOpts). + +start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> + PoolName = <>, + #{ + pool_size := PoolSize + } = StartConf, Options = [ {name, PoolName}, {pool_size, PoolSize}, {client_opts, ClientOpts} ], - ok = emqx_resource:allocate_resource(ResourceId, egress_pool_name, PoolName), - case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of + ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName), + case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> - {ok, #{ - egress_pool_name => PoolName, - egress_config => emqx_bridge_mqtt_egress:config(Egress) - }}; + {ok, #{pool_name => PoolName}}; {error, {start_pool_failed, _, Reason}} -> {error, Reason} end. -on_stop(ResourceId, _State) -> +on_stop(ResourceId, State) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", connector => ResourceId }), + %% on_stop can be called with State = undefined + StateMap = + case State of + Map when is_map(State) -> + Map; + _ -> + #{} + end, + case maps:get(topic_to_handler_index, StateMap, undefined) of + undefined -> + ok; + TopicToHandlerIndex -> + emqx_topic_index:delete(TopicToHandlerIndex) + end, Allocated = emqx_resource:get_allocated_resources(ResourceId), - ok = stop_ingress(Allocated), - ok = stop_egress(Allocated). + ok = stop_helper(Allocated). -stop_ingress(#{ingress_pool_name := PoolName}) -> - emqx_resource_pool:stop(PoolName); -stop_ingress(#{}) -> - ok. - -stop_egress(#{egress_pool_name := PoolName}) -> - emqx_resource_pool:stop(PoolName); -stop_egress(#{}) -> - ok. +stop_helper(#{pool_name := PoolName}) -> + emqx_resource_pool:stop(PoolName). on_query( ResourceId, - {send_message, Msg}, - #{egress_pool_name := PoolName, egress_config := Config} + {ChannelId, Msg}, + #{pool_name := PoolName} = State ) -> - ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), - handle_send_result(with_egress_client(PoolName, send, [Msg, Config])); -on_query(ResourceId, {send_message, Msg}, #{}) -> + ?TRACE( + "QUERY", + "send_msg_to_remote_node", + #{ + message => Msg, + connector => ResourceId, + channel_id => ChannelId + } + ), + Channels = maps:get(installed_channels, State), + ChannelConfig = maps:get(ChannelId, Channels), + handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig])); +on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", connector => ResourceId, @@ -187,13 +269,15 @@ on_query(ResourceId, {send_message, Msg}, #{}) -> on_query_async( ResourceId, - {send_message, Msg}, + {ChannelId, Msg}, CallbackIn, - #{egress_pool_name := PoolName, egress_config := Config} + #{pool_name := PoolName} = State ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), Callback = {fun on_async_result/2, [CallbackIn]}, - Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]), + Channels = maps:get(installed_channels, State), + ChannelConfig = maps:get(ChannelId, Channels), + Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]), case Result of ok -> ok; @@ -202,7 +286,7 @@ on_query_async( {error, Reason} -> {error, classify_error(Reason)} end; -on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) -> +on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", connector => ResourceId, @@ -251,7 +335,7 @@ classify_error(Reason) -> {unrecoverable_error, Reason}. on_get_status(_ResourceId, State) -> - Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)), + Pools = maps:to_list(maps:with([pool_name], State)), Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> @@ -261,12 +345,10 @@ on_get_status(_ResourceId, State) -> connecting end. -get_status({Pool, Worker}) -> +get_status({_Pool, Worker}) -> case ecpool_worker:client(Worker) of - {ok, Client} when Pool == ingress_pool_name -> + {ok, Client} -> emqx_bridge_mqtt_ingress:status(Client); - {ok, Client} when Pool == egress_pool_name -> - emqx_bridge_mqtt_egress:status(Client); {error, _} -> disconnected end. @@ -284,30 +366,19 @@ combine_status(Statuses) -> end. mk_ingress_config( - ResourceId, - #{ - ingress := Ingress = #{remote := _}, - server := Server, - hookpoint := HookPoint - } + ChannelId, + IngressChannelConfig, + TopicToHandlerIndex ) -> - Ingress#{ - server => Server, - on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]} - }; -mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) -> - error({no_hookpoint_provided, ResourceId, Conf}); -mk_ingress_config(_ResourceId, #{}) -> - undefined. - -mk_egress_config(#{egress := Egress = #{remote := _}}) -> - Egress; -mk_egress_config(#{}) -> - undefined. + HookPoints = maps:get(hookpoints, IngressChannelConfig, []), + NewConf = IngressChannelConfig#{ + on_message_received => {?MODULE, on_message_received, [HookPoints, ChannelId]}, + ingress_list => [IngressChannelConfig] + }, + emqx_bridge_mqtt_ingress:config(NewConf, ChannelId, TopicToHandlerIndex). mk_client_opts( ResourceId, - ClientScope, Config = #{ server := Server, keepalive := KeepAlive, @@ -327,14 +398,15 @@ mk_client_opts( % A load balancing server (such as haproxy) is often set up before the emqx broker server. % When the load balancing server enables mqtt connection packet inspection, % non-standard mqtt connection packets might be filtered out by LB. - bridge_mode + bridge_mode, + topic_to_handler_index ], Config ), Name = parse_id_to_name(ResourceId), mk_client_opt_password(Options#{ hosts => [HostPort], - clientid => clientid(Name, ClientScope, Config), + clientid => clientid(Name, Config), connect_timeout => 30, keepalive => ms_to_s(KeepAlive), force_ping => true, @@ -357,9 +429,75 @@ mk_client_opt_password(Options) -> ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when +clientid(Name, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) andalso Prefix =/= <<>> -> - emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]); -clientid(Name, ClientScope, _Conf) -> - emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]). + emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name]); +clientid(Name, _Conf) -> + emqx_bridge_mqtt_lib:clientid_base([Name]). + +%% @doc Start an ingress bridge worker. +-spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(Options) -> + WorkerId = proplists:get_value(ecpool_worker_id, Options), + ?SLOG(debug, #{ + msg => "ingress_client_starting", + options => emqx_utils:redact(Options) + }), + Name = proplists:get_value(name, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + ClientOpts = proplists:get_value(client_opts, Options), + case emqtt:start_link(mk_client_opts(Name, WorkerId, ClientOpts)) of + {ok, Pid} -> + connect(Pid, Name); + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +mk_client_opts( + Name, + WorkerId, + ClientOpts = #{ + clientid := ClientId, + topic_to_handler_index := TopicToHandlerIndex + } +) -> + ClientOpts#{ + clientid := mk_clientid(WorkerId, ClientId), + msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex) + }. + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +mk_client_event_handler(Name, TopicToHandlerIndex) -> + #{ + publish => {fun emqx_bridge_mqtt_ingress:handle_publish/3, [Name, TopicToHandlerIndex]}, + disconnected => {fun ?MODULE:handle_disconnect/1, []} + }. + +-spec connect(pid(), name()) -> + {ok, pid()} | {error, _Reason}. +connect(Pid, Name) -> + case emqtt:connect(Pid) of + {ok, _Props} -> + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }), + _ = catch emqtt:stop(Pid), + Error + end. + +handle_disconnect(_Reason) -> + ok. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 32f9e9295..e863d2a2e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -30,6 +30,10 @@ parse_server/1 ]). +-export([ + connector_examples/1 +]). + -import(emqx_schema, [mk_duration/2]). -import(hoconsc, [mk/2, ref/2]). @@ -61,6 +65,39 @@ fields("config") -> } )} ]; +fields("config_connector") -> + [ + {enable, + mk( + boolean(), + #{ + desc => <<"Enable or disable this connector">>, + default => true + } + )}, + {description, emqx_schema:description_schema()}, + {resource_opts, + mk( + hoconsc:ref(creation_opts), + #{ + required => false, + desc => ?DESC(emqx_resource_schema, "creation_opts") + } + )}, + {pool_size, fun egress_pool_size/1} + % {ingress, + % mk( + % hoconsc:array( + % hoconsc:ref(connector_ingress) + % ), + % #{ + % required => {false, recursively}, + % desc => ?DESC("ingress_desc") + % } + % )} + ] ++ fields("server_configs"); +fields(creation_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("server_configs") -> [ {mode, @@ -131,6 +168,7 @@ fields("server_configs") -> fields("ingress") -> [ {pool_size, fun ingress_pool_size/1}, + %% array {remote, mk( ref(?MODULE, "ingress_remote"), @@ -144,6 +182,22 @@ fields("ingress") -> } )} ]; +fields(connector_ingress) -> + [ + {remote, + mk( + ref(?MODULE, "ingress_remote"), + #{desc => ?DESC("ingress_remote")} + )}, + {local, + mk( + ref(?MODULE, "ingress_local"), + #{ + desc => ?DESC("ingress_local"), + importance => ?IMPORTANCE_HIDDEN + } + )} + ]; fields("ingress_remote") -> [ {topic, @@ -269,7 +323,15 @@ fields("egress_remote") -> desc => ?DESC("payload") } )} - ]. + ]; +fields("get_connector") -> + fields("config_connector"); +fields("post_connector") -> + fields("config_connector"); +fields("put_connector") -> + fields("config_connector"); +fields(What) -> + error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}). ingress_pool_size(desc) -> ?DESC("ingress_pool_size"); @@ -304,3 +366,6 @@ qos() -> parse_server(Str) -> #{hostname := Host, port := Port} = emqx_schema:parse_server(Str, ?MQTT_HOST_OPTS), {Host, Port}. + +connector_examples(_Method) -> + [#{}]. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 2573cad8b..38bdd9665 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -20,33 +20,16 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --behaviour(ecpool_worker). - -%% ecpool --export([connect/1]). - -export([ config/1, send/3, send_async/4 ]). -%% management APIs --export([ - status/1, - info/1 -]). - --type name() :: term(). -type message() :: emqx_types:message() | map(). -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type remote_message() :: #mqtt_msg{}. --type option() :: - {name, name()} - %% see `emqtt:option()` - | {client_opts, map()}. - -type egress() :: #{ local => #{ topic => emqx_types:topic() @@ -54,51 +37,6 @@ remote := emqx_bridge_mqtt_msg:msgvars() }. -%% @doc Start an ingress bridge worker. --spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> - {ok, pid()} | {error, _Reason}. -connect(Options) -> - ?SLOG(debug, #{ - msg => "egress_client_starting", - options => emqx_utils:redact(Options) - }), - Name = proplists:get_value(name, Options), - WorkerId = proplists:get_value(ecpool_worker_id, Options), - ClientOpts = proplists:get_value(client_opts, Options), - case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of - {ok, Pid} -> - connect(Pid, Name); - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "egress_client_start_failed", - config => emqx_utils:redact(ClientOpts), - reason => Reason - }), - Error - end. - -mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) -> - ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. - -mk_clientid(WorkerId, ClientId) -> - emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). - -connect(Pid, Name) -> - case emqtt:connect(Pid) of - {ok, _Props} -> - {ok, Pid}; - {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => "egress_client_connect_failed", - reason => Reason, - name => Name - }), - _ = catch emqtt:stop(Pid), - Error - end. - -%% - -spec config(map()) -> egress(). config(#{remote := RC = #{}} = Conf) -> @@ -137,25 +75,3 @@ to_remote_msg(Msg = #{}, Remote) -> props = emqx_utils:pub_props_to_packet(PubProps), payload = Payload }. - -%% - --spec info(pid()) -> - [{atom(), term()}]. -info(Pid) -> - emqtt:info(Pid). - --spec status(pid()) -> - emqx_resource:resource_status(). -status(Pid) -> - try - case proplists:get_value(socket, info(Pid)) of - Socket when Socket /= undefined -> - connected; - undefined -> - connecting - end - catch - exit:{noproc, _} -> - disconnected - end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index a051ffbd8..d59318a84 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -17,129 +17,188 @@ -module(emqx_bridge_mqtt_ingress). -include_lib("emqx/include/logger.hrl"). - --behaviour(ecpool_worker). - -%% ecpool --export([connect/1]). +-include_lib("emqx/include/emqx_mqtt.hrl"). %% management APIs -export([ status/1, - info/1 + info/1, + subscribe_channel/2, + unsubscribe_channel/4, + config/3 ]). --export([handle_publish/5]). --export([handle_disconnect/1]). +-export([handle_publish/3]). --type name() :: term(). +subscribe_channel(PoolName, ChannelConfig) -> + Workers = ecpool:workers(PoolName), + PoolSize = length(Workers), + Results = [ + subscribe_channel(Pid, Name, ChannelConfig, Idx, PoolSize) + || {{Name, Idx}, Pid} <- Workers + ], + case proplists:get_value(error, Results, ok) of + ok -> + ok; + Error -> + Error + end. --type option() :: - {name, name()} - | {ingress, map()} - %% see `emqtt:option()` - | {client_opts, map()}. +subscribe_channel(WorkerPid, Name, Ingress, WorkerIdx, PoolSize) -> + case ecpool_worker:client(WorkerPid) of + {ok, Client} -> + subscribe_channel_helper(Client, Name, Ingress, WorkerIdx, PoolSize); + {error, Reason} -> + error({client_not_found, Reason}) + end. --type ingress() :: #{ - server := string(), - remote := #{ - topic := emqx_types:topic(), - qos => emqx_types:qos() - }, - local := emqx_bridge_mqtt_msg:msgvars(), - on_message_received := {module(), atom(), [term()]} -}. - -%% @doc Start an ingress bridge worker. --spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> - {ok, pid()} | {error, _Reason}. -connect(Options) -> - ?SLOG(debug, #{ - msg => "ingress_client_starting", - options => emqx_utils:redact(Options) - }), - Name = proplists:get_value(name, Options), - WorkerId = proplists:get_value(ecpool_worker_id, Options), - Ingress = config(proplists:get_value(ingress, Options), Name), - ClientOpts = proplists:get_value(client_opts, Options), - case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of - {ok, Pid} -> - connect(Pid, Name, Ingress); +subscribe_channel_helper(Client, Name, Ingress, WorkerIdx, PoolSize) -> + IngressList = maps:get(ingress_list, Ingress, []), + SubscribeResults = subscribe_remote_topics( + Client, IngressList, WorkerIdx, PoolSize, Name + ), + %% Find error if any using proplists:get_value/2 + case proplists:get_value(error, SubscribeResults, ok) of + ok -> + ok; {error, Reason} = Error -> ?SLOG(error, #{ - msg => "client_start_failed", - config => emqx_utils:redact(ClientOpts), + msg => "ingress_client_subscribe_failed", + ingress => Ingress, + name => Name, reason => Reason }), Error end. -mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> - ClientOpts#{ - clientid := mk_clientid(WorkerId, ClientId), - msg_handler => mk_client_event_handler(Name, Ingress) - }. +subscribe_remote_topics(Pid, IngressList, WorkerIdx, PoolSize, Name) -> + [subscribe_remote_topic(Pid, Ingress, WorkerIdx, PoolSize, Name) || Ingress <- IngressList]. -mk_clientid(WorkerId, ClientId) -> - emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). - -mk_client_event_handler(Name, Ingress = #{}) -> - IngressVars = maps:with([server], Ingress), - OnMessage = maps:get(on_message_received, Ingress, undefined), - LocalPublish = - case Ingress of - #{local := Local = #{topic := _}} -> - Local; - #{} -> - undefined - end, - #{ - publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]}, - disconnected => {fun ?MODULE:handle_disconnect/1, []} - }. - --spec connect(pid(), name(), ingress()) -> - {ok, pid()} | {error, _Reason}. -connect(Pid, Name, Ingress) -> - case emqtt:connect(Pid) of - {ok, _Props} -> - case subscribe_remote_topic(Pid, Ingress) of - {ok, _, _RCs} -> - {ok, Pid}; - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "ingress_client_subscribe_failed", - ingress => Ingress, - name => Name, - reason => Reason - }), - _ = catch emqtt:stop(Pid), - Error - end; - {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => "ingress_client_connect_failed", - reason => Reason, - name => Name - }), - _ = catch emqtt:stop(Pid), - Error +subscribe_remote_topic( + Pid, #{remote := #{topic := RemoteTopic, qos := QoS}} = _Remote, WorkerIdx, PoolSize, Name +) -> + case should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, _LogWarn = true) of + true -> + emqtt:subscribe(Pid, RemoteTopic, QoS); + false -> + ok end. -subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> - emqtt:subscribe(Pid, RemoteTopic, QoS). +should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, LogWarn) -> + IsFirstWorker = WorkerIdx == 1, + case emqx_topic:parse(RemoteTopic) of + {#share{} = _Filter, _SubOpts} -> + % NOTE: this is shared subscription, many workers may subscribe + true; + {_Filter, #{}} when PoolSize > 1, IsFirstWorker, LogWarn -> + % NOTE: this is regular subscription, only one worker should subscribe + ?SLOG(warning, #{ + msg => "mqtt_pool_size_ignored", + connector => Name, + reason => + "Remote topic filter is not a shared subscription, " + "only a single connection will be used from the connection pool", + config_pool_size => PoolSize, + pool_size => PoolSize + }), + IsFirstWorker; + {_Filter, #{}} -> + % NOTE: this is regular subscription, only one worker should subscribe + IsFirstWorker + end. -%% +unsubscribe_channel(PoolName, ChannelConfig, ChannelId, TopicToHandlerIndex) -> + Workers = ecpool:workers(PoolName), + PoolSize = length(Workers), + _ = [ + unsubscribe_channel(Pid, Name, ChannelConfig, Idx, PoolSize, ChannelId, TopicToHandlerIndex) + || {{Name, Idx}, Pid} <- Workers + ], + ok. --spec config(map(), name()) -> - ingress(). -config(#{remote := RC, local := LC} = Conf, BridgeName) -> - Conf#{ +unsubscribe_channel(WorkerPid, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex) -> + case ecpool_worker:client(WorkerPid) of + {ok, Client} -> + unsubscribe_channel_helper( + Client, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex + ); + {error, Reason} -> + error({client_not_found, Reason}) + end. + +unsubscribe_channel_helper( + Client, Name, Ingress, WorkerIdx, PoolSize, ChannelId, TopicToHandlerIndex +) -> + IngressList = maps:get(ingress_list, Ingress, []), + unsubscribe_remote_topics( + Client, IngressList, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex + ). + +unsubscribe_remote_topics( + Pid, IngressList, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex +) -> + [ + unsubscribe_remote_topic( + Pid, Ingress, WorkerIdx, PoolSize, Name, ChannelId, TopicToHandlerIndex + ) + || Ingress <- IngressList + ]. + +unsubscribe_remote_topic( + Pid, + #{remote := #{topic := RemoteTopic}} = _Remote, + WorkerIdx, + PoolSize, + Name, + ChannelId, + TopicToHandlerIndex +) -> + emqx_topic_index:delete(RemoteTopic, ChannelId, TopicToHandlerIndex), + case should_subscribe(RemoteTopic, WorkerIdx, PoolSize, Name, _NoWarn = false) of + true -> + case emqtt:unsubscribe(Pid, RemoteTopic) of + {ok, _Properties, _ReasonCodes} -> + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "unsubscribe_mqtt_topic_failed", + channel_id => Name, + reason => Reason + }), + ok + end; + false -> + ok + end. + +config(#{ingress_list := IngressList} = Conf, Name, TopicToHandlerIndex) -> + NewIngressList = [ + fix_remote_config(Ingress, Name, TopicToHandlerIndex, Conf) + || Ingress <- IngressList + ], + Conf#{ingress_list => NewIngressList}. + +fix_remote_config(#{remote := RC, local := LC}, BridgeName, TopicToHandlerIndex, Conf) -> + FixedConf = Conf#{ remote => parse_remote(RC, BridgeName), local => emqx_bridge_mqtt_msg:parse(LC) - }. + }, + insert_to_topic_to_handler_index(FixedConf, TopicToHandlerIndex, BridgeName), + FixedConf. -parse_remote(#{qos := QoSIn} = Conf, BridgeName) -> +insert_to_topic_to_handler_index( + #{remote := #{topic := Topic}} = Conf, TopicToHandlerIndex, BridgeName +) -> + TopicPattern = + case emqx_topic:parse(Topic) of + {#share{group = _Group, topic = TP}, _} -> + TP; + _ -> + Topic + end, + emqx_topic_index:insert(TopicPattern, BridgeName, Conf, TopicToHandlerIndex). + +parse_remote(#{qos := QoSIn} = Remote, BridgeName) -> QoS = downgrade_ingress_qos(QoSIn), case QoS of QoSIn -> @@ -152,7 +211,7 @@ parse_remote(#{qos := QoSIn} = Conf, BridgeName) -> name => BridgeName }) end, - Conf#{qos => QoS}. + Remote#{qos => QoS}. downgrade_ingress_qos(2) -> 1; @@ -183,17 +242,39 @@ status(Pid) -> %% -handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) -> - Msg = import_msg(MsgIn, IngressVars), +handle_publish( + #{properties := Props, topic := Topic} = MsgIn, + Name, + TopicToHandlerIndex +) -> ?SLOG(debug, #{ msg => "ingress_publish_local", - message => Msg, + message => MsgIn, name => Name }), - maybe_on_message_received(Msg, OnMessage), - maybe_publish_local(Msg, LocalPublish, Props). + Matches = emqx_topic_index:matches(Topic, TopicToHandlerIndex, []), + lists:foreach( + fun(Match) -> + handle_match(TopicToHandlerIndex, Match, MsgIn, Name, Props) + end, + Matches + ), + ok. -handle_disconnect(_Reason) -> +handle_match( + TopicToHandlerIndex, + Match, + MsgIn, + _Name, + Props +) -> + [ChannelConfig] = emqx_topic_index:get_record(Match, TopicToHandlerIndex), + #{on_message_received := OnMessage} = ChannelConfig, + Msg = import_msg(MsgIn, ChannelConfig), + + maybe_on_message_received(Msg, OnMessage), + LocalPublish = maps:get(local, ChannelConfig, undefined), + _ = maybe_publish_local(Msg, LocalPublish, Props), ok. maybe_on_message_received(Msg, {Mod, Func, Args}) -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl new file mode 100644 index 000000000..6bcdc611b --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl @@ -0,0 +1,221 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_pubsub_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_connector_config/1, + bridge_v1_config_to_action_config/2, + connector_action_config_to_bridge_v1_config/2, + is_source/0 +]). + +bridge_v1_type_name() -> mqtt. + +action_type_name() -> mqtt. + +connector_type_name() -> mqtt. + +schema_module() -> emqx_bridge_mqtt_pubsub_schema. + +is_source() -> true. + +bridge_v1_config_to_connector_config(Config) -> + %% Transform the egress part to mqtt_publisher connector config + SimplifiedConfig = check_and_simplify_bridge_v1_config(Config), + ConnectorConfigMap = make_connector_config_from_bridge_v1_config(SimplifiedConfig), + {mqtt, ConnectorConfigMap}. + +make_connector_config_from_bridge_v1_config(Config) -> + ConnectorConfigSchema = emqx_bridge_mqtt_connector_schema:fields("config_connector"), + ConnectorTopFields = [ + erlang:atom_to_binary(FieldName, utf8) + || {FieldName, _} <- ConnectorConfigSchema + ], + ConnectorConfigMap = maps:with(ConnectorTopFields, Config), + ResourceOptsSchema = emqx_bridge_mqtt_connector_schema:fields(creation_opts), + ResourceOptsTopFields = [ + erlang:atom_to_binary(FieldName, utf8) + || {FieldName, _} <- ResourceOptsSchema + ], + ResourceOptsMap = maps:get(<<"resource_opts">>, ConnectorConfigMap, #{}), + ResourceOptsMap2 = maps:with(ResourceOptsTopFields, ResourceOptsMap), + ConnectorConfigMap2 = maps:put(<<"resource_opts">>, ResourceOptsMap2, ConnectorConfigMap), + IngressMap0 = maps:get(<<"ingress">>, Config, #{}), + EgressMap = maps:get(<<"egress">>, Config, #{}), + % %% Move pool_size to the top level + PoolSizeIngress = maps:get(<<"pool_size">>, IngressMap0, undefined), + PoolSize = + case PoolSizeIngress of + undefined -> + DefaultPoolSize = emqx_connector_schema_lib:pool_size(default), + maps:get(<<"pool_size">>, EgressMap, DefaultPoolSize); + _ -> + PoolSizeIngress + end, + % IngressMap1 = maps:remove(<<"pool_size">>, IngressMap0), + %% Remove ingress part from the config + ConnectorConfigMap3 = maps:remove(<<"ingress">>, ConnectorConfigMap2), + %% Remove egress part from the config + ConnectorConfigMap4 = maps:remove(<<"egress">>, ConnectorConfigMap3), + ConnectorConfigMap5 = maps:put(<<"pool_size">>, PoolSize, ConnectorConfigMap4), + % ConnectorConfigMap4 = + % case IngressMap1 =:= #{} of + % true -> + % ConnectorConfigMap3; + % false -> + % maps:put(<<"ingress">>, [IngressMap1], ConnectorConfigMap3) + % end, + ConnectorConfigMap5. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + SimplifiedConfig = check_and_simplify_bridge_v1_config(BridgeV1Config), + bridge_v1_config_to_action_config_helper( + SimplifiedConfig, ConnectorName + ). + +bridge_v1_config_to_action_config_helper( + #{ + <<"egress">> := EgressMap0 + } = Config, + ConnectorName +) -> + %% Transform the egress part to mqtt_publisher connector config + SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_publisher_action"), + ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("resource_opts"), + ConfigMap1 = general_action_conf_map_from_bridge_v1_config( + Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields + ), + LocalTopicMap = maps:get(<<"local">>, EgressMap0, #{}), + LocalTopic = maps:get(<<"topic">>, LocalTopicMap, undefined), + EgressMap1 = maps:remove(<<"local">>, EgressMap0), + %% Add parameters field (Egress map) to the action config + ConfigMap2 = maps:put(<<"parameters">>, EgressMap1, ConfigMap1), + ConfigMap3 = + case LocalTopic of + undefined -> + ConfigMap2; + _ -> + maps:put(<<"local_topic">>, LocalTopic, ConfigMap2) + end, + {action, mqtt, ConfigMap3}; +bridge_v1_config_to_action_config_helper( + #{ + <<"ingress">> := IngressMap + } = Config, + ConnectorName +) -> + %% Transform the egress part to mqtt_publisher connector config + SchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("mqtt_subscriber_source"), + ResourceOptsSchemaFields = emqx_bridge_mqtt_pubsub_schema:fields("resource_opts"), + ConfigMap1 = general_action_conf_map_from_bridge_v1_config( + Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields + ), + IngressMap1 = maps:remove(<<"pool_size">>, IngressMap), + %% Add parameters field (Egress map) to the action config + ConfigMap2 = maps:put(<<"parameters">>, IngressMap1, ConfigMap1), + {source, mqtt, ConfigMap2}; +bridge_v1_config_to_action_config_helper( + _Config, + _ConnectorName +) -> + error({incompatible_bridge_v1, no_matching_action_or_source}). + +general_action_conf_map_from_bridge_v1_config( + Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields +) -> + ShemaFieldsNames = [ + erlang:atom_to_binary(FieldName, utf8) + || {FieldName, _} <- SchemaFields + ], + ActionConfig0 = maps:with(ShemaFieldsNames, Config), + ResourceOptsSchemaFieldsNames = [ + erlang:atom_to_binary(FieldName, utf8) + || {FieldName, _} <- ResourceOptsSchemaFields + ], + ResourceOptsMap = maps:get(<<"resource_opts">>, ActionConfig0, #{}), + ResourceOptsMap2 = maps:with(ResourceOptsSchemaFieldsNames, ResourceOptsMap), + %% Only put resource_opts if the original config has it + ActionConfig1 = + case maps:is_key(<<"resource_opts">>, ActionConfig0) of + true -> + maps:put(<<"resource_opts">>, ResourceOptsMap2, ActionConfig0); + false -> + ActionConfig0 + end, + ActionConfig2 = maps:put(<<"connector">>, ConnectorName, ActionConfig1), + ActionConfig2. + +check_and_simplify_bridge_v1_config( + #{ + <<"egress">> := EgressMap + } = Config +) when map_size(EgressMap) =:= 0 -> + check_and_simplify_bridge_v1_config(maps:remove(<<"egress">>, Config)); +check_and_simplify_bridge_v1_config( + #{ + <<"ingress">> := IngressMap + } = Config +) when map_size(IngressMap) =:= 0 -> + check_and_simplify_bridge_v1_config(maps:remove(<<"ingress">>, Config)); +check_and_simplify_bridge_v1_config(#{ + <<"egress">> := _EGressMap, + <<"ingress">> := _InGressMap +}) -> + %% We should crash beacuse we don't support upgrading when ingress and egress exist at the same time + error( + {unsupported_config, + <<"Upgrade not supported when ingress and egress exist in the same MQTT bridge. Please divide the egress and ingress part to separate bridges in the configuration.">>} + ); +check_and_simplify_bridge_v1_config(SimplifiedConfig) -> + SimplifiedConfig. + +connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig +) -> + Params = maps:get(<<"parameters">>, ActionConfig, #{}), + ResourceOptsConnector = maps:get(<<"resource_opts">>, ConnectorConfig, #{}), + ResourceOptsAction = maps:get(<<"resource_opts">>, ActionConfig, #{}), + ResourceOpts = maps:merge(ResourceOptsConnector, ResourceOptsAction), + %% Check the direction of the action + Direction = + case maps:get(<<"remote">>, Params) of + #{<<"retain">> := _} -> + %% Only source has retain + <<"publisher">>; + _ -> + <<"subscriber">> + end, + Parms2 = maps:remove(<<"direction">>, Params), + DefaultPoolSize = emqx_connector_schema_lib:pool_size(default), + PoolSize = maps:get(<<"pool_size">>, ConnectorConfig, DefaultPoolSize), + Parms3 = maps:put(<<"pool_size">>, PoolSize, Parms2), + ConnectorConfig2 = maps:remove(<<"pool_size">>, ConnectorConfig), + LocalTopic = maps:get(<<"local_topic">>, ActionConfig, undefined), + BridgeV1Conf0 = + case {Direction, LocalTopic} of + {<<"publisher">>, undefined} -> + #{<<"egress">> => Parms3}; + {<<"publisher">>, LocalT} -> + #{ + <<"egress">> => Parms3, + <<"local">> => + #{ + <<"topic">> => LocalT + } + }; + {<<"subscriber">>, _} -> + #{<<"ingress">> => Parms3} + end, + BridgeV1Conf1 = maps:merge(BridgeV1Conf0, ConnectorConfig2), + BridgeV1Conf2 = BridgeV1Conf1#{ + <<"resource_opts">> => ResourceOpts + }, + BridgeV1Conf2. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl new file mode 100644 index 000000000..2aba6e8ea --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -0,0 +1,129 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_mqtt_pubsub_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, ref/2]). + +-export([roots/0, fields/1, desc/1, namespace/0]). + +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). + +%%====================================================================================== +%% Hocon Schema Definitions +namespace() -> "bridge_mqtt_publisher". + +roots() -> []. + +fields(action) -> + {mqtt, + mk( + hoconsc:map(name, ref(?MODULE, "mqtt_publisher_action")), + #{ + desc => <<"MQTT Publisher Action Config">>, + required => false + } + )}; +fields("mqtt_publisher_action") -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + Fields0 = emqx_bridge_mqtt_connector_schema:fields("egress"), + Fields1 = proplists:delete(pool_size, Fields0), + Fields2 = proplists:delete(local, Fields1), + Fields2; +fields(source) -> + {mqtt, + mk( + hoconsc:map(name, ref(?MODULE, "mqtt_subscriber_source")), + #{ + desc => <<"MQTT Subscriber Source Config">>, + required => false + } + )}; +fields("mqtt_subscriber_source") -> + emqx_bridge_v2_schema:make_consumer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, ingress_parameters), + #{ + required => true, + desc => ?DESC("source_parameters") + } + ) + ); +fields(ingress_parameters) -> + Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"), + Fields1 = proplists:delete(pool_size, Fields0), + Fields1; +fields("resource_opts") -> + UnsupportedOpts = [enable_batch, batch_size, batch_time], + lists:filter( + fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, + emqx_resource_schema:fields("creation_opts") + ); +fields("get_connector") -> + emqx_bridge_mqtt_connector_schema:fields("config_connector"); +fields("get_bridge_v2") -> + fields("mqtt_publisher_action"); +fields("post_bridge_v2") -> + fields("mqtt_publisher_action"); +fields("put_bridge_v2") -> + fields("mqtt_publisher_action"); +fields(What) -> + error({emqx_bridge_mqtt_pubsub_schema, missing_field_handler, What}). +%% v2: api schema +%% The parameter equls to +%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 +%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1 +%%-------------------------------------------------------------------- +%% v1/v2 + +desc("config") -> + ?DESC("desc_config"); +desc("resource_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; +desc("config_connector") -> + ?DESC("desc_config"); +desc("http_action") -> + ?DESC("desc_config"); +desc("parameters_opts") -> + ?DESC("config_parameters_opts"); +desc(_) -> + undefined. + +bridge_v2_examples(_Method) -> + [ + #{} + ]. + +conn_bridge_examples(_Method) -> + [ + #{} + ]. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 6d1ff0915..bd3fb68de 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -238,7 +238,8 @@ t_conf_bridge_authn_passfile(Config) -> post, uri(["bridges"]), ?SERVER_CONF(<<>>, <<"file://im/pretty/sure/theres/no/such/file">>)#{ - <<"name">> => <<"t_conf_bridge_authn_no_passfile">> + <<"name">> => <<"t_conf_bridge_authn_no_passfile">>, + <<"ingress">> => ?INGRESS_CONF#{<<"pool_size">> => 1} } ), ?assertMatch({match, _}, re:run(Reason, <<"failed_to_read_secret_file">>)). @@ -397,32 +398,25 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. -t_mqtt_egress_bridge_ignores_clean_start(_) -> +t_mqtt_egress_bridge_warns_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), - BridgeID = create_bridge( - ?SERVER_CONF#{ - <<"name">> => BridgeName, - <<"egress">> => ?EGRESS_CONF, - <<"clean_start">> => false - } - ), + Action = fun() -> + BridgeID = create_bridge( + ?SERVER_CONF#{ + <<"name">> => BridgeName, + <<"egress">> => ?EGRESS_CONF, + <<"clean_start">> => false + } + ), - ResourceID = emqx_bridge_resource:resource_id(BridgeID), - {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} = - emqx_resource_manager:lookup_cached(ResourceID), - ClientInfo = ecpool:pick_and_do( - EgressPoolName, - {emqx_bridge_mqtt_egress, info, []}, - no_handover + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []) + end, + ?wait_async_action( + Action(), + #{?snk_kind := mqtt_clean_start_egress_action_warning}, + 10000 ), - ?assertMatch( - #{clean_start := true}, - maps:from_list(ClientInfo) - ), - - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - ok. t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index a58a1ef3d..f85109080 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -51,11 +51,11 @@ -export([parse_url/1]). --callback connector_config(ParsedConfig) -> +-callback connector_config(ParsedConfig, Context) -> ParsedConfig when - ParsedConfig :: #{atom() => any()}. --optional_callbacks([connector_config/1]). + ParsedConfig :: #{atom() => any()}, Context :: #{atom() => any()}. +-optional_callbacks([connector_config/2]). -if(?EMQX_RELEASE_EDITION == ee). connector_to_resource_type(ConnectorType) -> @@ -81,6 +81,10 @@ connector_impl_module(_ConnectorType) -> connector_to_resource_type_ce(http) -> emqx_bridge_http_connector; +connector_to_resource_type_ce(mqtt) -> + emqx_bridge_mqtt_connector; +% connector_to_resource_type_ce(mqtt_subscriber) -> +% emqx_bridge_mqtt_subscriber_connector; connector_to_resource_type_ce(ConnectorType) -> error({no_bridge_v2, ConnectorType}). @@ -276,6 +280,12 @@ remove(Type, Name, _Conf, _Opts) -> emqx_resource:remove_local(resource_id(Type, Name)). %% convert connector configs to what the connector modules want +parse_confs( + <<"mqtt">> = Type, + Name, + Conf +) -> + insert_hookpoints(Type, Name, Conf); parse_confs( <<"http">>, _Name, @@ -307,6 +317,13 @@ parse_confs( parse_confs(ConnectorType, Name, Config) -> connector_config(ConnectorType, Name, Config). +insert_hookpoints(Type, Name, Conf) -> + BId = emqx_bridge_resource:bridge_id(Type, Name), + BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BId), + ConnectorHookpoint = connector_hookpoint(BId), + HookPoints = [BridgeHookpoint, ConnectorHookpoint], + Conf#{hookpoints => HookPoints}. + connector_config(ConnectorType, Name, Config) -> Mod = connector_impl_module(ConnectorType), case erlang:function_exported(Mod, connector_config, 2) of diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index b043ebacd..74b92c165 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -90,7 +90,9 @@ api_schemas(Method) -> [ %% We need to map the `type' field of a request (binary) to a %% connector schema module. - api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector") + api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector"), + % api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt_subscriber">>, Method ++ "_connector"), + api_ref(emqx_bridge_mqtt_connector_schema, <<"mqtt">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> @@ -110,10 +112,11 @@ examples(Method) -> -if(?EMQX_RELEASE_EDITION == ee). schema_modules() -> - [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules(). + [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema] ++ + emqx_connector_ee_schema:schema_modules(). -else. schema_modules() -> - [emqx_bridge_http_schema]. + [emqx_bridge_http_schema, emqx_bridge_mqtt_connector_schema]. -endif. %% @doc Return old bridge(v1) and/or connector(v2) type @@ -136,6 +139,8 @@ connector_type_to_bridge_types(influxdb) -> [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(mysql) -> [mysql]; +connector_type_to_bridge_types(mqtt) -> + [mqtt]; connector_type_to_bridge_types(pgsql) -> [pgsql]; connector_type_to_bridge_types(redis) -> @@ -151,7 +156,8 @@ connector_type_to_bridge_types(iotdb) -> connector_type_to_bridge_types(elasticsearch) -> [elasticsearch]. -actions_config_name() -> <<"actions">>. +actions_config_name(action) -> <<"actions">>; +actions_config_name(source) -> <<"sources">>. has_connector_field(BridgeConf, ConnectorFields) -> lists:any( @@ -185,40 +191,58 @@ bridge_configs_to_transform( end. split_bridge_to_connector_and_action( - {ConnectorsMap, {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}} + { + {ConnectorsMap, OrgConnectorType}, + {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig} + } ) -> - ConnectorMap = + {ConnectorMap, ConnectorType} = case emqx_action_info:has_custom_bridge_v1_config_to_connector_config(BridgeType) of true -> - emqx_action_info:bridge_v1_config_to_connector_config( - BridgeType, BridgeV1Conf - ); + case + emqx_action_info:bridge_v1_config_to_connector_config( + BridgeType, BridgeV1Conf + ) + of + {ConType, ConMap} -> + {ConMap, ConType}; + ConMap -> + {ConMap, OrgConnectorType} + end; false -> %% We do an automatic transformation to get the connector config %% if the callback is not defined. %% Get connector fields from bridge config - lists:foldl( - fun({ConnectorFieldName, _Spec}, ToTransformSoFar) -> - ConnectorFieldNameBin = to_bin(ConnectorFieldName), - case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of - true -> - PrevFieldConfig = - maybe_project_to_connector_resource_opts( + NewCConMap = + lists:foldl( + fun({ConnectorFieldName, _Spec}, ToTransformSoFar) -> + ConnectorFieldNameBin = to_bin(ConnectorFieldName), + case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of + true -> + PrevFieldConfig = + maybe_project_to_connector_resource_opts( + ConnectorFieldNameBin, + maps:get(ConnectorFieldNameBin, BridgeV1Conf) + ), + NewToTransform0 = maps:put( ConnectorFieldNameBin, - maps:get(ConnectorFieldNameBin, BridgeV1Conf) + PrevFieldConfig, + ToTransformSoFar ), - maps:put( - ConnectorFieldNameBin, - PrevFieldConfig, + NewToTransform1 = maps:put( + to_bin(ConnectorFieldName), + maps:get(to_bin(ConnectorFieldName), BridgeV1Conf), + NewToTransform0 + ), + NewToTransform1; + false -> ToTransformSoFar - ); - false -> - ToTransformSoFar - end - end, - #{}, - ConnectorFields - ) + end + end, + #{}, + ConnectorFields + ), + {NewCConMap, OrgConnectorType} end, %% Generate a connector name, if needed. Avoid doing so if there was a previous config. ConnectorName = @@ -226,18 +250,29 @@ split_bridge_to_connector_and_action( #{<<"connector">> := ConnectorName0} -> ConnectorName0; _ -> generate_connector_name(ConnectorsMap, BridgeName, 0) end, - ActionMap = + OrgActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), + {ActionMap, ActionType, ActionOrSource} = case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of true -> - emqx_action_info:bridge_v1_config_to_action_config( - BridgeType, BridgeV1Conf, ConnectorName - ); + case + emqx_action_info:bridge_v1_config_to_action_config( + BridgeType, BridgeV1Conf, ConnectorName + ) + of + {ActionOrSource0, ActionType0, ActionMap0} -> + {ActionMap0, ActionType0, ActionOrSource0}; + ActionMap0 -> + {ActionMap0, OrgActionType, action} + end; false -> - transform_bridge_v1_config_to_action_config( - BridgeV1Conf, ConnectorName, ConnectorFields - ) + ActionMap0 = + transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorFields + ), + {ActionMap0, OrgActionType} end, - {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. + {BridgeType, BridgeName, ActionMap, ActionType, ActionOrSource, ConnectorName, ConnectorMap, + ConnectorType}. maybe_project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) -> project_to_connector_resource_opts(OldResourceOpts); @@ -307,9 +342,9 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> ConnectorNameList = case Attempt of 0 -> - io_lib:format("connector_~s", [BridgeName]); + io_lib:format("~s", [BridgeName]); _ -> - io_lib:format("connector_~s_~p", [BridgeName, Attempt + 1]) + io_lib:format("~s_~p", [BridgeName, Attempt + 1]) end, ConnectorName = iolist_to_binary(ConnectorNameList), case maps:is_key(ConnectorName, ConnectorsMap) of @@ -340,7 +375,10 @@ transform_old_style_bridges_to_connector_and_actions_of_type( ), ConnectorsWithTypeMap = maps:get(to_bin(ConnectorType), ConnectorsConfMap, #{}), BridgeConfigsToTransformWithConnectorConf = lists:zip( - lists:duplicate(length(BridgeConfigsToTransform), ConnectorsWithTypeMap), + lists:duplicate( + length(BridgeConfigsToTransform), + {ConnectorsWithTypeMap, ConnectorType} + ), BridgeConfigsToTransform ), ActionConnectorTuples = lists:map( @@ -349,10 +387,14 @@ transform_old_style_bridges_to_connector_and_actions_of_type( ), %% Add connectors and actions and remove bridges lists:foldl( - fun({BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}, RawConfigSoFar) -> + fun( + {BridgeType, BridgeName, ActionMap, NewActionType, ActionOrSource, ConnectorName, + ConnectorMap, NewConnectorType}, + RawConfigSoFar + ) -> %% Add connector RawConfigSoFar1 = emqx_utils_maps:deep_put( - [<<"connectors">>, to_bin(ConnectorType), ConnectorName], + [<<"connectors">>, to_bin(NewConnectorType), ConnectorName], RawConfigSoFar, ConnectorMap ), @@ -362,12 +404,21 @@ transform_old_style_bridges_to_connector_and_actions_of_type( RawConfigSoFar1 ), %% Add action - ActionType = emqx_action_info:bridge_v1_type_to_action_type(to_bin(BridgeType)), - RawConfigSoFar3 = emqx_utils_maps:deep_put( - [actions_config_name(), to_bin(ActionType), BridgeName], - RawConfigSoFar2, - ActionMap - ), + RawConfigSoFar3 = + case ActionMap of + none -> + RawConfigSoFar2; + _ -> + emqx_utils_maps:deep_put( + [ + actions_config_name(ActionOrSource), + to_bin(NewActionType), + BridgeName + ], + RawConfigSoFar2, + ActionMap + ) + end, RawConfigSoFar3 end, RawConfig, @@ -454,7 +505,23 @@ fields(connectors) -> desc => <<"HTTP Connector Config">>, required => false } + )}, + {mqtt, + mk( + hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")), + #{ + desc => <<"MQTT Publisher Connector Config">>, + required => false + } )} + % {mqtt_subscriber, + % mk( + % hoconsc:map(name, ref(emqx_bridge_mqtt_connector_schema, "config_connector")), + % #{ + % desc => <<"MQTT Subscriber Connector Config">>, + % required => false + % } + % )} ] ++ enterprise_fields_connectors(); fields("node_status") -> [