diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7e0851669..7a74ab438 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -50,6 +50,7 @@ ]). %% Operations + -export([ disable_enable/3, health_check/2, @@ -130,6 +131,9 @@ error := term() }. +-type bridge_v2_type() :: binary() | atom(). +-type bridge_v2_name() :: binary() | atom(). + %%==================================================================== %% Loading and unloading config when EMQX starts and stops %%==================================================================== @@ -180,7 +184,7 @@ unload_bridges() -> %% CRUD API %%==================================================================== --spec lookup(binary() | atom(), binary() | atom()) -> {ok, bridge_v2_info()} | {error, not_found}. +-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(Type, Name) -> case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of not_found -> @@ -228,7 +232,7 @@ lookup(Type, Name) -> list() -> list_with_lookup_fun(fun lookup/2). --spec create(atom() | binary(), binary(), map()) -> +-spec create(bridge_v2_type(), bridge_v2_name(), map()) -> {ok, emqx_config:update_result()} | {error, any()}. create(BridgeType, BridgeName, RawConf) -> ?SLOG(debug, #{ @@ -247,7 +251,7 @@ create(BridgeType, BridgeName, RawConf) -> %% NOTE: This function can cause broken references from rules but it is only %% called directly from test cases. --spec remove(atom() | binary(), atom() | binary()) -> ok | {error, any()}. +-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}. remove(BridgeType, BridgeName) -> ?SLOG(debug, #{ brige_action => remove, @@ -265,7 +269,7 @@ remove(BridgeType, BridgeName) -> {error, Reason} -> {error, Reason} end. --spec check_deps_and_remove(atom() | binary(), atom() | binary(), boolean()) -> ok | {error, any()}. +-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}. check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) -> AlsoDelete = case AlsoDeleteActions of @@ -437,6 +441,8 @@ combine_connector_and_bridge_v2_config( %% Operations %%==================================================================== +-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) -> + {ok, any()} | {error, any()}. disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable -> @@ -514,6 +520,7 @@ connector_operation_helper_with_conf( end end. +-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}. reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_conf(Type, Name)). @@ -521,7 +528,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) -> ok; reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id). + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id); +reset_metrics_helper(_, _, _) -> + {error, not_found}. get_query_mode(BridgeV2Type, Config) -> CreationOpts = emqx_resource:fetch_creation_opts(Config), @@ -529,6 +538,8 @@ get_query_mode(BridgeV2Type, Config) -> ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType), emqx_resource:query_mode(ResourceType, Config, CreationOpts). +-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) -> + term() | {error, term()}. send_message(BridgeType, BridgeName, Message, QueryOpts0) -> case lookup_conf(BridgeType, BridgeName) of #{enable := true} = Config0 -> @@ -562,8 +573,7 @@ do_send_msg_with_enabled_config( emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). -spec health_check(BridgeType :: term(), BridgeName :: term()) -> - #{status := term(), error := term()} | {error, Reason :: term()}. - + #{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}. health_check(BridgeType, BridgeName) -> case lookup_conf(BridgeType, BridgeName) of #{ @@ -582,6 +592,34 @@ health_check(BridgeType, BridgeName) -> Error end. +-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}. +create_dry_run(Type, Conf0) -> + Conf1 = maps:without([<<"name">>], Conf0), + TypeBin = bin(Type), + RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, + %% Check config + try + _ = + hocon_tconf:check_plain( + emqx_bridge_v2_schema, + RawConf, + #{atom_key => true, required => false} + ), + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end + catch + %% validation errors + throw:Reason1 -> + {error, Reason1} + end. + create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), ConnectorType = connector_type(BridgeType), @@ -613,33 +651,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> end, emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). -create_dry_run(Type, Conf0) -> - Conf1 = maps:without([<<"name">>], Conf0), - TypeBin = bin(Type), - RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, - %% Check config - try - _ = - hocon_tconf:check_plain( - emqx_bridge_v2_schema, - RawConf, - #{atom_key => true, required => false} - ), - #{<<"connector">> := ConnectorName} = Conf1, - %% Check that the connector exists and do the dry run if it exists - ConnectorType = connector_type(Type), - case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of - not_found -> - {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; - ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) - end - catch - %% validation errors - throw:Reason1 -> - {error, Reason1} - end. - +-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics(). get_metrics(Type, Name) -> emqx_resource:get_metrics(id(Type, Name)). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index f652e105b..8cb0b5590 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -647,10 +647,12 @@ t_load_config_success(_Config) -> {ok, _}, update_root_config(RootConf0) ), + BridgeTypeBin = bin(BridgeType), + BridgeNameBin = bin(BridgeName), ?assertMatch( {ok, #{ - type := BridgeType, - name := BridgeName, + type := BridgeTypeBin, + name := BridgeNameBin, raw_config := #{}, resource_data := #{} }}, @@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 -> end; wait_until(_, _) -> ct:fail("Wait until event did not happen"). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60e94d7e3..f5bf65c0f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -447,7 +447,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a030080b7..11391fb2b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -309,7 +309,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() => any()}. + #{status := resource_status(), error := term()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels