From a5a060473c54c1b898d59102fe4a623370173d87 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 17 Oct 2023 07:23:42 +0200 Subject: [PATCH] feat: restructure emqx_bridge_v2 for better readability --- apps/emqx_bridge/src/emqx_bridge_app.erl | 6 - apps/emqx_bridge/src/emqx_bridge_v2.erl | 1087 +++++++++-------- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 14 +- apps/emqx_resource/src/emqx_resource.erl | 37 +- .../src/emqx_rule_engine_api.erl | 2 +- 5 files changed, 589 insertions(+), 557 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index ad5dd52bb..7ff92a8f0 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -27,8 +27,6 @@ -define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())). -define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])). --define(TOP_LELVE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path())). --define(LEAF_NODE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path() ++ ['?', '?'])). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), @@ -38,16 +36,12 @@ start(_StartType, _StartArgs) -> ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge), - ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), - ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), ?tp(emqx_bridge_app_started, #{}), {ok, Sup}. stop(_State) -> emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH), emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH), - emqx_conf:remove_handler(emqx_bridge_v2:config_key_path()), - emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2), ok = emqx_bridge:unload(), ok = emqx_bridge_v2:unload(), ok. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 5c340c3e8..5cae532d2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -21,44 +21,22 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(ROOT_KEY, bridges_v2). + +%% Loading and unloading config when EMQX starts and stops -export([ load/0, - unload/0, - is_bridge_v2_type/1, - id/2, - id/3, - parse_id/1, - bridge_v2_type_to_connector_type/1, - bridge_v1_type_to_bridge_v2_type/1, - bridge_v1_id_to_connector_resource_id/1, - is_bridge_v2_id/1, - extract_connector_id_from_bridge_v2_id/1, - is_bridge_v2_installed_in_connector_state/2, - get_channels_for_connector/1 -]). - -%% Compatibility API - --export([ - lookup_and_transform_to_bridge_v1/2, - list_and_transform_to_bridge_v1/0, - bridge_v1_check_deps_and_remove/3, - split_bridge_v1_config_and_create/3, - bridge_v1_create_dry_run/2 + unload/0 ]). %% CRUD API -export([ list/0, - lookup/1, lookup/2, - get_metrics/2, - config_key_path/0, - disable_enable/3, create/3, remove/2, check_deps_and_remove/3 @@ -66,13 +44,33 @@ %% Operations -export([ + disable_enable/3, health_check/2, send_message/4, start/2, stop/2, restart/2, reset_metrics/2, - create_dry_run/2 + create_dry_run/2, + get_metrics/2 +]). + +%% On message publish hook (for local_topics) + +-export([on_message_publish/1]). + +%% Convenience functions for connector implementations + +-export([ + parse_id/1, + get_channels_for_connector/1 +]). + +%% Exported for tests +-export([ + bridge_v2_type_to_connector_type/1, + id/2, + id/3 ]). %% Config Update Handler API @@ -82,41 +80,137 @@ pre_config_update/3 ]). -%% On message publish hook +%% Compatibility API --export([on_message_publish/1]). +-export([ + is_bridge_v2_type/1, + lookup_and_transform_to_bridge_v1/2, + list_and_transform_to_bridge_v1/0, + bridge_v1_check_deps_and_remove/3, + split_bridge_v1_config_and_create/3, + bridge_v1_create_dry_run/2, + extract_connector_id_from_bridge_v2_id/1, + bridge_v1_type_to_bridge_v2_type/1, + bridge_v1_id_to_connector_resource_id/1 +]). --define(ROOT_KEY, bridges_v2). - -get_channels_for_connector(ConnectorId) -> - {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), - RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), - RelevantBridgeV2Types = [ - Type - || Type <- RootConf, - ?MODULE:bridge_v2_type_to_connector_type(Type) =:= ConnectorType - ], - lists:flatten([ - get_channels_for_connector(ConnectorName, BridgeV2Type) - || BridgeV2Type <- RelevantBridgeV2Types - ]). - -get_channels_for_connector(ConnectorName, BridgeV2Type) -> - BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), - [ - {id(BridgeV2Type, Name, ConnectorName), Conf} - || {Name, Conf} <- maps:to_list(BridgeV2s), - bin(ConnectorName) =:= maps:get(connector, Conf, no_name) - ]. +%%==================================================================== +%% Loading and unloading config when EMQX starts and stops +%%==================================================================== load() -> load_message_publish_hook(), + ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2), + ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2), ok. unload() -> unload_message_publish_hook(), + emqx_conf:remove_handler(config_key_path()), + emqx_conf:remove_handler(config_key_path_leaf()), ok. +%%==================================================================== +%% CRUD API +%%==================================================================== + +lookup(Type, Name) -> + case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of + not_found -> + {error, not_found}; + #{<<"connector">> := BridgeConnector} = RawConf -> + ConnectorId = emqx_connector_resource:resource_id( + ?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector + ), + %% The connector should always exist + InstanceData = + case emqx_resource:get_instance(ConnectorId) of + {ok, _, Data} -> + Data + end, + {ok, #{ + type => Type, + name => Name, + raw_config => RawConf, + resource_data => InstanceData + }} + end. + +list() -> + list_with_lookup_fun(fun lookup/2). + +create(BridgeType, BridgeName, RawConf) -> + ?SLOG(debug, #{ + brige_action => create, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName, + bridge_raw_config => emqx_utils:redact(RawConf) + }), + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + RawConf, + #{override_to => cluster} + ). + +%% NOTE: This function can cause broken references but it is only called from +%% test cases. +remove(BridgeType, BridgeName) -> + ?SLOG(debug, #{ + brige_action => remove, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName + }), + emqx_conf:remove( + config_key_path() ++ [BridgeType, BridgeName], + #{override_to => cluster} + ). + +check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> + BridgeId = external_id(BridgeType, BridgeName), + %% NOTE: This violates the design: Rule depends on data-bridge but not vice versa. + case emqx_rule_engine:get_rule_ids_by_action(BridgeId) of + [] -> + remove(BridgeType, BridgeName); + RuleIds when RemoveDeps =:= false -> + {error, {rules_deps_on_this_bridge, RuleIds}}; + RuleIds when RemoveDeps =:= true -> + lists:foreach( + fun(R) -> + emqx_rule_engine:ensure_action_removed(R, BridgeId) + end, + RuleIds + ), + remove(BridgeType, BridgeName) + end. + +%%-------------------------------------------------------------------- +%% Helpers for CRUD API +%%-------------------------------------------------------------------- + +list_with_lookup_fun(LookupFun) -> + maps:fold( + fun(Type, NameAndConf, Bridges) -> + maps:fold( + fun(Name, _RawConf, Acc) -> + [ + begin + {ok, BridgeInfo} = + LookupFun(Type, Name), + BridgeInfo + end + | Acc + ] + end, + Bridges, + NameAndConf + ) + end, + [], + emqx:get_raw_config([?ROOT_KEY], #{}) + ). + install_bridge_v2( _BridgeType, _BridgeName, @@ -175,52 +269,16 @@ uninstall_bridge_v2( ), emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). -get_query_mode(BridgeV2Type, Config) -> - CreationOpts = emqx_resource:fetch_creation_opts(Config), - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), - ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), - emqx_resource:query_mode(ResourceType, Config, CreationOpts). +%% Creates the external id for the bridge_v2 that is used by the rule actions +%% to refer to the bridge_v2 +external_id(BridgeType, BridgeName) -> + Name = bin(BridgeName), + Type = bin(BridgeType), + <>. -send_message(BridgeType, BridgeName, Message, QueryOpts0) -> - case lookup_raw_conf(BridgeType, BridgeName) of - #{enable := true} = Config -> - do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); - #{enable := false} -> - {error, bridge_stopped}; - Error -> - Error - end. - -do_send_msg_with_enabled_config( - BridgeType, BridgeName, Message, QueryOpts0, Config -) -> - QueryMode = get_query_mode(BridgeType, Config), - QueryOpts = maps:merge( - emqx_bridge:query_opts(Config), - QueryOpts0#{ - query_mode => QueryMode - } - ), - BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), - emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). - -health_check(BridgeType, BridgeName) -> - case lookup_raw_conf(BridgeType, BridgeName) of - #{ - enable := true, - connector := ConnectorName - } -> - ConnectorId = emqx_connector_resource:resource_id( - ?MODULE:bridge_v2_type_to_connector_type(BridgeType), ConnectorName - ), - emqx_resource_manager:channel_health_check( - ConnectorId, id(BridgeType, BridgeName, ConnectorName) - ); - #{enable := false} -> - {error, bridge_stopped}; - Error -> - Error - end. +%%==================================================================== +%% Operations +%%==================================================================== disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable @@ -275,6 +333,220 @@ reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id). +get_query_mode(BridgeV2Type, Config) -> + CreationOpts = emqx_resource:fetch_creation_opts(Config), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), + ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), + emqx_resource:query_mode(ResourceType, Config, CreationOpts). + +send_message(BridgeType, BridgeName, Message, QueryOpts0) -> + case lookup_raw_conf(BridgeType, BridgeName) of + #{enable := true} = Config -> + do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +do_send_msg_with_enabled_config( + BridgeType, BridgeName, Message, QueryOpts0, Config +) -> + QueryMode = get_query_mode(BridgeType, Config), + QueryOpts = maps:merge( + emqx_bridge:query_opts(Config), + QueryOpts0#{ + query_mode => QueryMode, + query_mode_cache_override => false + } + ), + BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + +health_check(BridgeType, BridgeName) -> + case lookup_raw_conf(BridgeType, BridgeName) of + #{ + enable := true, + connector := ConnectorName + } -> + ConnectorId = emqx_connector_resource:resource_id( + ?MODULE:bridge_v2_type_to_connector_type(BridgeType), ConnectorName + ), + emqx_resource_manager:channel_health_check( + ConnectorId, id(BridgeType, BridgeName, ConnectorName) + ); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> + BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), + OnReadyCallback = + fun(ConnectorId) -> + {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), + ChannelTestId = id(BridgeType, BridgeName, ConnectorName), + BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), + case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of + {error, Reason} -> + {error, Reason}; + ok -> + HealthCheckResult = emqx_resource_manager:channel_health_check( + ConnectorId, ChannelTestId + ), + case HealthCheckResult of + {error, Reason} -> + {error, Reason}; + _ -> + ok + end + end + end, + emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). + +create_dry_run(Type, Conf0) -> + Conf1 = maps:without([<<"name">>], Conf0), + TypeBin = bin(Type), + TypeAtom = binary_to_existing_atom(TypeBin), + RawConf = #{<<"bridges_v2">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + %% Check config + try + #{bridges_v2 := #{TypeAtom := #{temp_name := _Conf}}} = + hocon_tconf:check_plain( + emqx_bridge_v2_schema, + RawConf, + #{atom_key => true, required => false} + ) + catch + %% validation errors + throw:Reason1 -> + {error, Reason1} + end, + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end. + +get_metrics(Type, Name) -> + emqx_resource:get_metrics(id(Type, Name)). + +%%==================================================================== +%% On message publish hook (for local topics) +%%==================================================================== + +%% The following functions are more or less copied from emqx_bridge.erl + +reload_message_publish_hook(Bridges) -> + ok = unload_message_publish_hook(), + ok = load_message_publish_hook(Bridges). + +load_message_publish_hook() -> + Bridges = emqx:get_config([?ROOT_KEY], #{}), + load_message_publish_hook(Bridges). + +load_message_publish_hook(Bridges) -> + lists:foreach( + fun({Type, Bridge}) -> + lists:foreach( + fun({_Name, BridgeConf}) -> + do_load_message_publish_hook(Type, BridgeConf) + end, + maps:to_list(Bridge) + ) + end, + maps:to_list(Bridges) + ). + +do_load_message_publish_hook(_Type, #{local_topic := LocalTopic}) when is_binary(LocalTopic) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); +do_load_message_publish_hook(_Type, _Conf) -> + ok. + +unload_message_publish_hook() -> + ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). + +on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> + case maps:get(sys, Flags, false) of + false -> + {Msg, _} = emqx_rule_events:eventmsg_publish(Message), + send_to_matched_egress_bridges(Topic, Msg); + true -> + ok + end, + {ok, Message}. + +send_to_matched_egress_bridges(Topic, Msg) -> + MatchedBridgeIds = get_matched_egress_bridges(Topic), + lists:foreach( + fun({Type, Name}) -> + try send_message(Type, Name, Msg, #{}) of + {error, Reason} -> + ?SLOG(error, #{ + msg => "send_message_to_bridge_failed", + bridge_type => Type, + bridge_name => Name, + error => Reason + }); + _ -> + ok + catch + Err:Reason:ST -> + ?SLOG(error, #{ + msg => "send_message_to_bridge_exception", + bridge_type => Type, + bridge_name => Name, + error => Err, + reason => Reason, + stacktrace => ST + }) + end + end, + MatchedBridgeIds + ). + +get_matched_egress_bridges(Topic) -> + Bridges = emqx:get_config([?ROOT_KEY], #{}), + maps:fold( + fun(BType, Conf, Acc0) -> + maps:fold( + fun(BName, BConf, Acc1) -> + get_matched_bridge_id(BType, BConf, Topic, BName, Acc1) + end, + Acc0, + Conf + ) + end, + [], + Bridges + ). + +get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> + Acc; +get_matched_bridge_id(BType, Conf, Topic, BName, Acc) -> + case maps:get(local_topic, Conf, undefined) of + undefined -> + Acc; + Filter -> + do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) + end. + +do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) -> + case emqx_topic:match(Topic, Filter) of + true -> [{BType, BName} | Acc]; + false -> Acc + end. + +%%==================================================================== +%% Convenience functions for connector implementations +%%==================================================================== + parse_id(Id) -> case binary:split(Id, <<":">>, [global]) of [Type, Name] -> @@ -285,6 +557,31 @@ parse_id(Id) -> error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))}) end. +get_channels_for_connector(ConnectorId) -> + {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), + RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), + RelevantBridgeV2Types = [ + Type + || Type <- RootConf, + ?MODULE:bridge_v2_type_to_connector_type(Type) =:= ConnectorType + ], + lists:flatten([ + get_channels_for_connector(ConnectorName, BridgeV2Type) + || BridgeV2Type <- RelevantBridgeV2Types + ]). + +get_channels_for_connector(ConnectorName, BridgeV2Type) -> + BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), + [ + {id(BridgeV2Type, Name, ConnectorName), Conf} + || {Name, Conf} <- maps:to_list(BridgeV2s), + bin(ConnectorName) =:= maps:get(connector, Conf, no_name) + ]. + +%%==================================================================== +%% Exported for tests +%%==================================================================== + id(BridgeType, BridgeName) -> case lookup_raw_conf(BridgeType, BridgeName) of #{connector := ConnectorName} -> @@ -298,18 +595,163 @@ id(BridgeType, BridgeName, ConnectorName) -> <<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. -%% Creates the external id for the bridge_v2 that is used by the rule actions -%% to refer to the bridge_v2 -external_id(BridgeType, BridgeName) -> - Name = bin(BridgeName), - Type = bin(BridgeType), - <>. - bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) -> ?MODULE:bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin)); bridge_v2_type_to_connector_type(kafka) -> kafka. +%%==================================================================== +%% Config Update Handler API +%%==================================================================== + +config_key_path() -> + [?ROOT_KEY]. + +config_key_path_leaf() -> + [?ROOT_KEY, '?', '?']. + +%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the +%% underlying resources. +pre_config_update(_, {_Oper, _, _}, undefined) -> + {error, bridge_not_found}; +pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> + %% to save the 'enable' to the config files + {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; +pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) -> + {ok, Conf}. + +operation_to_enable(disable) -> false; +operation_to_enable(enable) -> true. + +%% This top level handler will be triggered when the bridges_v2 path is updated +%% with calls to emqx_conf:update([bridges_v2], BridgesConf, #{}). +%% +%% A public API that can trigger this is: +%% bin/emqx ctl conf load data/configs/cluster.hocon +post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> + #{added := Added, removed := Removed, changed := Updated} = + diff_confs(NewConf, OldConf), + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + ok = unload_message_publish_hook(), + ok = load_message_publish_hook(NewConf), + ?tp(bridge_post_config_update_done, #{}), + Result; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> + Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), + ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), + Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> + ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + Bridges = emqx_utils_maps:deep_put( + [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf + ), + reload_message_publish_hook(Bridges), + ?tp(bridge_post_config_update_done, #{}), + ok. + +diff_confs(NewConfs, OldConfs) -> + emqx_utils_maps:diff_maps( + flatten_confs(NewConfs), + flatten_confs(OldConfs) + ). + +flatten_confs(Conf0) -> + maps:from_list( + lists:flatmap( + fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, + maps:to_list(Conf0) + ) + ). + +do_flatten_confs(Type, Conf0) -> + [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. + +perform_bridge_changes(Tasks) -> + perform_bridge_changes(Tasks, ok). + +perform_bridge_changes([], Result) -> + Result; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> + OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), + Result = maps:fold( + fun + ({_Type, _Name}, _Conf, {error, Reason}) -> + {error, Reason}; + %% for update + ({Type, Name}, {OldConf, Conf}, _) -> + case Action(Type, Name, {OldConf, Conf}) of + {error, Reason} -> {error, Reason}; + Return -> Return + end; + ({Type, Name}, Conf, _) -> + try Action(Type, Name, Conf) of + {error, Reason} -> {error, Reason}; + Return -> Return + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "bridge_config_update_exception", + kind => Kind, + error => Error, + type => Type, + name => Name, + stacktrace => Stacktrace + }), + OnException(Type, Name, Conf), + erlang:raise(Kind, Error, Stacktrace) + end + end, + Result0, + MapConfs + ), + perform_bridge_changes(Tasks, Result). + +fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> + PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), + FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}), + unpack_bridge_conf(Type, FullConf, TopLevelConf). + +pack_bridge_conf(Type, RawConf, TopLevelConf) -> + #{TopLevelConf => #{bin(Type) => #{<<"foo">> => RawConf}}}. + +unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> + TypeBin = bin(Type), + #{TopLevelConf := Bridges} = PackedConf, + #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), + RawConf. + +%%==================================================================== +%% Compatibility API +%%==================================================================== bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) -> bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin)); bridge_v1_type_to_bridge_v2_type(kafka) -> @@ -322,85 +764,9 @@ is_bridge_v2_type(<<"kafka">>) -> is_bridge_v2_type(_) -> false. -is_bridge_v2_id(<<"bridge_v2:", _/binary>>) -> true; -is_bridge_v2_id(_) -> false. - -extract_connector_id_from_bridge_v2_id(Id) -> - case binary:split(Id, <<":">>, [global]) of - [<<"bridge_v2">>, _Type, _Name, <<"connector">>, ConnectorType, ConnecorName] -> - <<"connector:", ConnectorType/binary, ":", ConnecorName/binary>>; - _X -> - error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))}) - end. - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Str) when is_list(Str) -> list_to_binary(Str); -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). - -%% Basic CRUD Operations - -list() -> - list_with_lookup_fun(fun lookup/2). - list_and_transform_to_bridge_v1() -> list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2). -list_with_lookup_fun(LookupFun) -> - maps:fold( - fun(Type, NameAndConf, Bridges) -> - maps:fold( - fun(Name, _RawConf, Acc) -> - [ - begin - {ok, BridgeInfo} = - LookupFun(Type, Name), - BridgeInfo - end - | Acc - ] - end, - Bridges, - NameAndConf - ) - end, - [], - emqx:get_raw_config([?ROOT_KEY], #{}) - ). - -lookup(Id) -> - {Type, Name} = parse_id(Id), - lookup(Type, Name). - -%% TODO should not call this -% to_atom(Bin) when is_binary(Bin) -> -% binary_to_atom(Bin); -% to_atom(Atom) when is_atom(Atom) -> -% Atom. - -lookup(Type, Name) -> - case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of - not_found -> - {error, not_found}; - #{<<"connector">> := BridgeConnector} = RawConf -> - ConnectorId = emqx_connector_resource:resource_id( - ?MODULE:bridge_v2_type_to_connector_type(Type), BridgeConnector - ), - InstanceData = - case emqx_resource:get_instance(ConnectorId) of - {error, not_found} -> - %% TODO should we throw an error here (this should not happen)? - {error, not_found}; - {ok, _, Data} -> - Data - end, - {ok, #{ - type => Type, - name => Name, - raw_config => RawConf, - resource_data => InstanceData - }} - end. - lookup_and_transform_to_bridge_v1(Type, Name) -> case lookup(Type, Name) of {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} -> @@ -445,26 +811,6 @@ lookup_raw_conf(Type, Name) -> Config end. -get_metrics(Type, Name) -> - emqx_resource:get_metrics(id(Type, Name)). - -config_key_path() -> - [?ROOT_KEY]. - -create(BridgeType, BridgeName, RawConf) -> - ?SLOG(debug, #{ - brige_action => create, - bridge_version => 2, - bridge_type => BridgeType, - bridge_name => BridgeName, - bridge_raw_config => emqx_utils:redact(RawConf) - }), - emqx_conf:update( - config_key_path() ++ [BridgeType, BridgeName], - RawConf, - #{override_to => cluster} - ). - split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> #{ connector_type := ConnectorType, @@ -596,91 +942,6 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf). -create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> - BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), - OnReadyCallback = - fun(ConnectorId) -> - {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), - ChannelTestId = id(BridgeType, BridgeName, ConnectorName), - BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), - case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of - {error, Reason} -> - {error, Reason}; - ok -> - HealthCheckResult = emqx_resource_manager:channel_health_check( - ConnectorId, ChannelTestId - ), - case HealthCheckResult of - {error, Reason} -> - {error, Reason}; - _ -> - ok - end - end - end, - emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). - -create_dry_run(Type, Conf0) -> - Conf1 = maps:without([<<"name">>], Conf0), - TypeBin = bin(Type), - TypeAtom = binary_to_existing_atom(TypeBin), - RawConf = #{<<"bridges_v2">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, - %% Check config - try - #{bridges_v2 := #{TypeAtom := #{temp_name := _Conf}}} = - hocon_tconf:check_plain( - emqx_bridge_v2_schema, - RawConf, - #{atom_key => true, required => false} - ) - catch - %% validation errors - throw:Reason1 -> - {error, Reason1} - end, - #{<<"connector">> := ConnectorName} = Conf1, - %% Check that the connector exists and do the dry run if it exists - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), - case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of - not_found -> - {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; - ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) - end. - -%% NOTE: This function can cause broken references but it is only called from -%% test cases. -remove(BridgeType, BridgeName) -> - ?SLOG(debug, #{ - brige_action => remove, - bridge_version => 2, - bridge_type => BridgeType, - bridge_name => BridgeName - }), - emqx_conf:remove( - config_key_path() ++ [BridgeType, BridgeName], - #{override_to => cluster} - ). - -check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> - BridgeId = external_id(BridgeType, BridgeName), - %% NOTE: This violates the design: Rule depends on data-bridge but not vice versa. - case emqx_rule_engine:get_rule_ids_by_action(BridgeId) of - [] -> - remove(BridgeType, BridgeName); - RuleIds when RemoveDeps =:= false -> - {error, {rules_deps_on_this_bridge, RuleIds}}; - RuleIds when RemoveDeps =:= true -> - lists:foreach( - fun(R) -> - emqx_rule_engine:ensure_action_removed(R, BridgeId) - end, - RuleIds - ), - remove(BridgeType, BridgeName) - end. - bridge_v1_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> bridge_v1_check_deps_and_remove( BridgeType, @@ -721,151 +982,6 @@ connector_has_channels(BridgeV2Type, ConnectorName) -> true end. -%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the -%% underlying resources. -pre_config_update(_, {_Oper, _, _}, undefined) -> - {error, bridge_not_found}; -pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> - %% to save the 'enable' to the config files - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; -pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) -> - {ok, Conf}. - -operation_to_enable(disable) -> false; -operation_to_enable(enable) -> true. - -%% This top level handler will be triggered when the bridges_v2 path is updated -%% with calls to emqx_conf:update([bridges_v2], BridgesConf, #{}). -%% -%% A public API that can trigger this is: -%% bin/emqx ctl conf load data/configs/cluster.hocon -post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> - #{added := Added, removed := Removed, changed := Updated} = - diff_confs(NewConf, OldConf), - %% The config update will be failed if any task in `perform_bridge_changes` failed. - RemoveFun = fun uninstall_bridge_v2/3, - CreateFun = fun install_bridge_v2/3, - UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> - uninstall_bridge_v2(Type, Name, OldBridgeConf), - install_bridge_v2(Type, Name, Conf) - end, - Result = perform_bridge_changes([ - #{action => RemoveFun, data => Removed}, - #{ - action => CreateFun, - data => Added, - on_exception_fn => fun emqx_bridge_resource:remove/4 - }, - #{action => UpdateFun, data => Updated} - ]), - ok = unload_message_publish_hook(), - ok = load_message_publish_hook(NewConf), - ?tp(bridge_post_config_update_done, #{}), - Result; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> - Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), - ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), - Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> - ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), - ok = install_bridge_v2(BridgeType, BridgeName, NewConf), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf - ), - reload_message_publish_hook(Bridges), - ?tp(bridge_post_config_update_done, #{}), - ok. - -diff_confs(NewConfs, OldConfs) -> - emqx_utils_maps:diff_maps( - flatten_confs(NewConfs), - flatten_confs(OldConfs) - ). - -flatten_confs(Conf0) -> - maps:from_list( - lists:flatmap( - fun({Type, Conf}) -> - do_flatten_confs(Type, Conf) - end, - maps:to_list(Conf0) - ) - ). - -do_flatten_confs(Type, Conf0) -> - [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. - -perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). - -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> - OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - %% for update - ({Type, Name}, {OldConf, Conf}, _) -> - case Action(Type, Name, {OldConf, Conf}) of - {error, Reason} -> {error, Reason}; - Return -> Return - end; - ({Type, Name}, Conf, _) -> - try Action(Type, Name, Conf) of - {error, Reason} -> {error, Reason}; - Return -> Return - catch - Kind:Error:Stacktrace -> - ?SLOG(error, #{ - msg => "bridge_config_update_exception", - kind => Kind, - error => Error, - type => Type, - name => Name, - stacktrace => Stacktrace - }), - OnException(Type, Name, Conf), - erlang:raise(Kind, Error, Stacktrace) - end - end, - Result0, - MapConfs - ), - perform_bridge_changes(Tasks, Result). - -is_bridge_v2_installed_in_connector_state(Tag, State) when is_map(State) -> - BridgeV2s = maps:get(installed_bridge_v2s, State, #{}), - maps:is_key(Tag, BridgeV2s); -is_bridge_v2_installed_in_connector_state(_Tag, _State) -> - false. - -fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) -> - PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf), - FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}), - unpack_bridge_conf(Type, FullConf, TopLevelConf). - -pack_bridge_conf(Type, RawConf, TopLevelConf) -> - #{TopLevelConf => #{bin(Type) => #{<<"foo">> => RawConf}}}. - -unpack_bridge_conf(Type, PackedConf, TopLevelConf) -> - TypeBin = bin(Type), - #{TopLevelConf := Bridges} = PackedConf, - #{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges), - RawConf. - bridge_v1_id_to_connector_resource_id(BridgeId) -> case binary:split(BridgeId, <<":">>) of [Type, Name] -> @@ -881,104 +997,21 @@ bridge_v1_id_to_connector_resource_id(BridgeId) -> <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>> end. -%% The following functions are copied from emqx_bridge.erl +%%==================================================================== +%% Misc helper functions +%%==================================================================== -reload_message_publish_hook(Bridges) -> - ok = unload_message_publish_hook(), - ok = load_message_publish_hook(Bridges). +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). -load_message_publish_hook() -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), - load_message_publish_hook(Bridges). +is_bridge_v2_id(<<"bridge_v2:", _/binary>>) -> true; +is_bridge_v2_id(_) -> false. -load_message_publish_hook(Bridges) -> - lists:foreach( - fun({Type, Bridge}) -> - lists:foreach( - fun({_Name, BridgeConf}) -> - do_load_message_publish_hook(Type, BridgeConf) - end, - maps:to_list(Bridge) - ) - end, - maps:to_list(Bridges) - ). - -do_load_message_publish_hook(_Type, #{local_topic := LocalTopic}) when is_binary(LocalTopic) -> - emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); -do_load_message_publish_hook(_Type, _Conf) -> - ok. - -unload_message_publish_hook() -> - ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). - -on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> - case maps:get(sys, Flags, false) of - false -> - {Msg, _} = emqx_rule_events:eventmsg_publish(Message), - send_to_matched_egress_bridges(Topic, Msg); - true -> - ok - end, - {ok, Message}. - -send_to_matched_egress_bridges(Topic, Msg) -> - MatchedBridgeIds = get_matched_egress_bridges(Topic), - lists:foreach( - fun({Type, Name}) -> - try send_message(Type, Name, Msg, #{}) of - {error, Reason} -> - ?SLOG(error, #{ - msg => "send_message_to_bridge_failed", - bridge_type => Type, - bridge_name => Name, - error => Reason - }); - _ -> - ok - catch - Err:Reason:ST -> - ?SLOG(error, #{ - msg => "send_message_to_bridge_exception", - bridge_type => Type, - bridge_name => Name, - error => Err, - reason => Reason, - stacktrace => ST - }) - end - end, - MatchedBridgeIds - ). - -get_matched_egress_bridges(Topic) -> - Bridges = emqx:get_config([?ROOT_KEY], #{}), - maps:fold( - fun(BType, Conf, Acc0) -> - maps:fold( - fun(BName, BConf, Acc1) -> - get_matched_bridge_id(BType, BConf, Topic, BName, Acc1) - end, - Acc0, - Conf - ) - end, - [], - Bridges - ). - -get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> - Acc; -get_matched_bridge_id(BType, Conf, Topic, BName, Acc) -> - case maps:get(local_topic, Conf, undefined) of - undefined -> - Acc; - Filter -> - do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) - end. - -do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) -> - case emqx_topic:match(Topic, Filter) of - true -> [{BType, BName} | Acc]; - false -> Acc +extract_connector_id_from_bridge_v2_id(Id) -> + case binary:split(Id, <<":">>, [global]) of + [<<"bridge_v2">>, _Type, _Name, <<"connector">>, ConnectorType, ConnecorName] -> + <<"connector:", ConnectorType/binary, ":", ConnecorName/binary>>; + _X -> + error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))}) end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 73e03e7c9..b64d7e92e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -112,7 +112,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) -> %% Remove the fake connector - ok = emqx_connector:remove(con_type(), con_name()), + {ok, _} = emqx_connector:remove(con_type(), con_name()), Config. t_create_remove(_) -> @@ -120,6 +120,18 @@ t_create_remove(_) -> {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. +t_list(_) -> + [] = emqx_bridge_v2:list(), + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), + 1 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge2, bridge_config()), + 2 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + 1 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge2), + 0 = length(emqx_bridge_v2:list()), + ok. + t_create_dry_run(_) -> ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 9af41adbc..eab64774d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -205,35 +205,29 @@ -callback query_mode(Config :: term()) -> query_mode(). -%% This callback handles the installation of a specified Bridge V2 resource. +%% This callback handles the installation of a specified channel. %% -%% It's guaranteed that the provided Bridge V2 is not already installed when this -%% function is invoked. Upon successful installation, the function should return a -%% new state with the installed Bridge V2 encapsulated within the `installed_bridge_v2s` map. -%% -%% The Bridge V2 state must be stored in the `installed_bridge_v2s` map using the -%% Bridge V2 resource ID as the key, as the caching mechanism depends on this structure. -%% -%% If the Bridge V2 cannot be successfully installed, the callback shall -%% throw an exception. +%% If the channel cannot be successfully installed, the callback shall +%% throw an exception or return an error tuple. -callback on_add_channel( - ResId :: term(), ResourceState :: term(), BridgeV2Id :: binary(), ChannelConfig :: map() -) -> {ok, NewState :: #{installed_bridge_v2s := map()}}. + ResId :: term(), ResourceState :: term(), ChannelId :: binary(), ChannelConfig :: map() +) -> {ok, term()} | {error, term()}. -%% This callback handles the deinstallation of a specified Bridge V2 resource. +%% This callback handles the removal of a specified channel resource. %% -%% It's guaranteed that the provided Bridge V2 is installed when this +%% It's guaranteed that the provided channel is installed when this %% function is invoked. Upon successful deinstallation, the function should return -%% a new state where the Bridge V2 id key has been removed from the `installed_bridge_v2s` map. +%% a new state %% -%% If the Bridge V2 cannot be successfully deinstalled, the callback shall +%% If the channel cannot be successfully deinstalled, the callback should %% log an error. %% -%% Also see the documentation for `on_add_channel/4`. -callback on_remove_channel( - ResId :: term(), ResourceState :: term(), BridgeV2Id :: binary() + ResId :: term(), ResourceState :: term(), ChannelId :: binary() ) -> {ok, NewState :: term()}. +%% This callback shall return a list of channel configs that are currently active +%% for the resource with the given id. -callback on_get_channels( ResId :: term() ) -> {ok, [term()]}. @@ -362,7 +356,6 @@ query(ResId, Request) -> -spec query(resource_id(), Request :: term(), query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> - %% We keep this A case get_query_mode_error(ResId, Opts) of {error, _} = ErrorTuple -> ErrorTuple; @@ -406,15 +399,15 @@ query(ResId, Request, Opts) -> end. get_query_mode_error(ResId, Opts) -> - case emqx_bridge_v2:is_bridge_v2_id(ResId) of - true -> + case maps:get(query_mode_cache_override, Opts, true) of + false -> case Opts of #{query_mode := QueryMode} -> {QueryMode, ok}; _ -> {async, unhealthy_target} end; - false -> + true -> case emqx_resource_manager:lookup_cached(ResId) of {ok, _Group, #{query_mode := QM, error := Error}} -> {QM, Error}; diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2b277c723..246bd8f01 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -522,7 +522,7 @@ format_action(Actions) -> do_format_action({bridge, BridgeType, BridgeName, _ResId}) -> emqx_bridge_resource:bridge_id(BridgeType, BridgeName); do_format_action({bridge_v2, BridgeType, BridgeName}) -> - emqx_bridge_v2:id(BridgeType, BridgeName); + emqx_bridge_resource:bridge_id(BridgeType, BridgeName); do_format_action(#{mod := Mod, func := Func, args := Args}) -> #{ function => printable_function_name(Mod, Func),