docs(bridge_V2): type specs for operations
This commit is contained in:
parent
99031f0dae
commit
cd5b1f9b96
|
@ -50,6 +50,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Operations
|
%% Operations
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
disable_enable/3,
|
disable_enable/3,
|
||||||
health_check/2,
|
health_check/2,
|
||||||
|
@ -130,6 +131,9 @@
|
||||||
error := term()
|
error := term()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-type bridge_v2_type() :: binary() | atom().
|
||||||
|
-type bridge_v2_name() :: binary() | atom().
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% Loading and unloading config when EMQX starts and stops
|
%% Loading and unloading config when EMQX starts and stops
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
@ -180,7 +184,7 @@ unload_bridges() ->
|
||||||
%% CRUD API
|
%% CRUD API
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
|
||||||
-spec lookup(binary() | atom(), binary() | atom()) -> {ok, bridge_v2_info()} | {error, not_found}.
|
-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}.
|
||||||
lookup(Type, Name) ->
|
lookup(Type, Name) ->
|
||||||
case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of
|
case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of
|
||||||
not_found ->
|
not_found ->
|
||||||
|
@ -228,7 +232,7 @@ lookup(Type, Name) ->
|
||||||
list() ->
|
list() ->
|
||||||
list_with_lookup_fun(fun lookup/2).
|
list_with_lookup_fun(fun lookup/2).
|
||||||
|
|
||||||
-spec create(atom() | binary(), binary(), map()) ->
|
-spec create(bridge_v2_type(), bridge_v2_name(), map()) ->
|
||||||
{ok, emqx_config:update_result()} | {error, any()}.
|
{ok, emqx_config:update_result()} | {error, any()}.
|
||||||
create(BridgeType, BridgeName, RawConf) ->
|
create(BridgeType, BridgeName, RawConf) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
|
@ -247,7 +251,7 @@ create(BridgeType, BridgeName, RawConf) ->
|
||||||
%% NOTE: This function can cause broken references from rules but it is only
|
%% NOTE: This function can cause broken references from rules but it is only
|
||||||
%% called directly from test cases.
|
%% called directly from test cases.
|
||||||
|
|
||||||
-spec remove(atom() | binary(), atom() | binary()) -> ok | {error, any()}.
|
-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}.
|
||||||
remove(BridgeType, BridgeName) ->
|
remove(BridgeType, BridgeName) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
brige_action => remove,
|
brige_action => remove,
|
||||||
|
@ -265,7 +269,7 @@ remove(BridgeType, BridgeName) ->
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec check_deps_and_remove(atom() | binary(), atom() | binary(), boolean()) -> ok | {error, any()}.
|
-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}.
|
||||||
check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
|
check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
|
||||||
AlsoDelete =
|
AlsoDelete =
|
||||||
case AlsoDeleteActions of
|
case AlsoDeleteActions of
|
||||||
|
@ -437,6 +441,8 @@ combine_connector_and_bridge_v2_config(
|
||||||
%% Operations
|
%% Operations
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
|
||||||
|
-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
|
||||||
|
{ok, any()} | {error, any()}.
|
||||||
disable_enable(Action, BridgeType, BridgeName) when
|
disable_enable(Action, BridgeType, BridgeName) when
|
||||||
Action =:= disable; Action =:= enable
|
Action =:= disable; Action =:= enable
|
||||||
->
|
->
|
||||||
|
@ -514,6 +520,7 @@ connector_operation_helper_with_conf(
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}.
|
||||||
reset_metrics(Type, Name) ->
|
reset_metrics(Type, Name) ->
|
||||||
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
|
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
|
||||||
|
|
||||||
|
@ -521,7 +528,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) ->
|
||||||
ok;
|
ok;
|
||||||
reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
|
reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
|
||||||
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
|
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
|
||||||
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id).
|
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id);
|
||||||
|
reset_metrics_helper(_, _, _) ->
|
||||||
|
{error, not_found}.
|
||||||
|
|
||||||
get_query_mode(BridgeV2Type, Config) ->
|
get_query_mode(BridgeV2Type, Config) ->
|
||||||
CreationOpts = emqx_resource:fetch_creation_opts(Config),
|
CreationOpts = emqx_resource:fetch_creation_opts(Config),
|
||||||
|
@ -529,6 +538,8 @@ get_query_mode(BridgeV2Type, Config) ->
|
||||||
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
|
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
|
||||||
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
|
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
|
||||||
|
|
||||||
|
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
|
||||||
|
term() | {error, term()}.
|
||||||
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
|
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
|
||||||
case lookup_conf(BridgeType, BridgeName) of
|
case lookup_conf(BridgeType, BridgeName) of
|
||||||
#{enable := true} = Config0 ->
|
#{enable := true} = Config0 ->
|
||||||
|
@ -562,8 +573,7 @@ do_send_msg_with_enabled_config(
|
||||||
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
|
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
|
||||||
|
|
||||||
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
|
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
|
||||||
#{status := term(), error := term()} | {error, Reason :: term()}.
|
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
|
||||||
|
|
||||||
health_check(BridgeType, BridgeName) ->
|
health_check(BridgeType, BridgeName) ->
|
||||||
case lookup_conf(BridgeType, BridgeName) of
|
case lookup_conf(BridgeType, BridgeName) of
|
||||||
#{
|
#{
|
||||||
|
@ -582,6 +592,34 @@ health_check(BridgeType, BridgeName) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
|
||||||
|
create_dry_run(Type, Conf0) ->
|
||||||
|
Conf1 = maps:without([<<"name">>], Conf0),
|
||||||
|
TypeBin = bin(Type),
|
||||||
|
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
|
||||||
|
%% Check config
|
||||||
|
try
|
||||||
|
_ =
|
||||||
|
hocon_tconf:check_plain(
|
||||||
|
emqx_bridge_v2_schema,
|
||||||
|
RawConf,
|
||||||
|
#{atom_key => true, required => false}
|
||||||
|
),
|
||||||
|
#{<<"connector">> := ConnectorName} = Conf1,
|
||||||
|
%% Check that the connector exists and do the dry run if it exists
|
||||||
|
ConnectorType = connector_type(Type),
|
||||||
|
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
|
||||||
|
not_found ->
|
||||||
|
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
|
||||||
|
ConnectorRawConf ->
|
||||||
|
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
%% validation errors
|
||||||
|
throw:Reason1 ->
|
||||||
|
{error, Reason1}
|
||||||
|
end.
|
||||||
|
|
||||||
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
|
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
|
||||||
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
|
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
|
||||||
ConnectorType = connector_type(BridgeType),
|
ConnectorType = connector_type(BridgeType),
|
||||||
|
@ -613,33 +651,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
|
||||||
end,
|
end,
|
||||||
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
|
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
|
||||||
|
|
||||||
create_dry_run(Type, Conf0) ->
|
-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics().
|
||||||
Conf1 = maps:without([<<"name">>], Conf0),
|
|
||||||
TypeBin = bin(Type),
|
|
||||||
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
|
|
||||||
%% Check config
|
|
||||||
try
|
|
||||||
_ =
|
|
||||||
hocon_tconf:check_plain(
|
|
||||||
emqx_bridge_v2_schema,
|
|
||||||
RawConf,
|
|
||||||
#{atom_key => true, required => false}
|
|
||||||
),
|
|
||||||
#{<<"connector">> := ConnectorName} = Conf1,
|
|
||||||
%% Check that the connector exists and do the dry run if it exists
|
|
||||||
ConnectorType = connector_type(Type),
|
|
||||||
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
|
|
||||||
not_found ->
|
|
||||||
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
|
|
||||||
ConnectorRawConf ->
|
|
||||||
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
%% validation errors
|
|
||||||
throw:Reason1 ->
|
|
||||||
{error, Reason1}
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_metrics(Type, Name) ->
|
get_metrics(Type, Name) ->
|
||||||
emqx_resource:get_metrics(id(Type, Name)).
|
emqx_resource:get_metrics(id(Type, Name)).
|
||||||
|
|
||||||
|
|
|
@ -647,10 +647,12 @@ t_load_config_success(_Config) ->
|
||||||
{ok, _},
|
{ok, _},
|
||||||
update_root_config(RootConf0)
|
update_root_config(RootConf0)
|
||||||
),
|
),
|
||||||
|
BridgeTypeBin = bin(BridgeType),
|
||||||
|
BridgeNameBin = bin(BridgeName),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{
|
{ok, #{
|
||||||
type := BridgeType,
|
type := BridgeTypeBin,
|
||||||
name := BridgeName,
|
name := BridgeNameBin,
|
||||||
raw_config := #{},
|
raw_config := #{},
|
||||||
resource_data := #{}
|
resource_data := #{}
|
||||||
}},
|
}},
|
||||||
|
@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 ->
|
||||||
end;
|
end;
|
||||||
wait_until(_, _) ->
|
wait_until(_, _) ->
|
||||||
ct:fail("Wait until event did not happen").
|
ct:fail("Wait until event did not happen").
|
||||||
|
|
||||||
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
|
@ -447,7 +447,7 @@ health_check(ResId) ->
|
||||||
emqx_resource_manager:health_check(ResId).
|
emqx_resource_manager:health_check(ResId).
|
||||||
|
|
||||||
-spec channel_health_check(resource_id(), channel_id()) ->
|
-spec channel_health_check(resource_id(), channel_id()) ->
|
||||||
#{status := channel_status(), error := term(), any() => any()}.
|
#{status := resource_status(), error := term()}.
|
||||||
channel_health_check(ResId, ChannelId) ->
|
channel_health_check(ResId, ChannelId) ->
|
||||||
emqx_resource_manager:channel_health_check(ResId, ChannelId).
|
emqx_resource_manager:channel_health_check(ResId, ChannelId).
|
||||||
|
|
||||||
|
|
|
@ -309,7 +309,7 @@ health_check(ResId) ->
|
||||||
safe_call(ResId, health_check, ?T_OPERATION).
|
safe_call(ResId, health_check, ?T_OPERATION).
|
||||||
|
|
||||||
-spec channel_health_check(resource_id(), channel_id()) ->
|
-spec channel_health_check(resource_id(), channel_id()) ->
|
||||||
#{status := channel_status(), error := term(), any() => any()}.
|
#{status := resource_status(), error := term()}.
|
||||||
channel_health_check(ResId, ChannelId) ->
|
channel_health_check(ResId, ChannelId) ->
|
||||||
%% Do normal health check first to trigger health checks for channels
|
%% Do normal health check first to trigger health checks for channels
|
||||||
%% and update the cached health status for the channels
|
%% and update the cached health status for the channels
|
||||||
|
|
Loading…
Reference in New Issue