Merge pull request #8095 from terry-xiaoyu/resource_start_timeout
Make emqx_resource:create/5 return fast when starting a unavailable resource
This commit is contained in:
commit
9de3272635
|
@ -49,6 +49,7 @@
|
||||||
|
|
||||||
%% Internal error
|
%% Internal error
|
||||||
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
||||||
|
-define(SERVICE_UNAVAILABLE, 'SERVICE_UNAVAILABLE').
|
||||||
-define(SOURCE_ERROR, 'SOURCE_ERROR').
|
-define(SOURCE_ERROR, 'SOURCE_ERROR').
|
||||||
-define(UPDATE_FAILED, 'UPDATE_FAILED').
|
-define(UPDATE_FAILED, 'UPDATE_FAILED').
|
||||||
-define(REST_FAILED, 'REST_FAILED').
|
-define(REST_FAILED, 'REST_FAILED').
|
||||||
|
@ -81,6 +82,7 @@
|
||||||
{'TOPIC_NOT_FOUND', <<"Topic not found">>},
|
{'TOPIC_NOT_FOUND', <<"Topic not found">>},
|
||||||
{'USER_NOT_FOUND', <<"User not found">>},
|
{'USER_NOT_FOUND', <<"User not found">>},
|
||||||
{'INTERNAL_ERROR', <<"Server inter error">>},
|
{'INTERNAL_ERROR', <<"Server inter error">>},
|
||||||
|
{'SERVICE_UNAVAILABLE', <<"Service unavailable">>},
|
||||||
{'SOURCE_ERROR', <<"Source error">>},
|
{'SOURCE_ERROR', <<"Source error">>},
|
||||||
{'UPDATE_FAILED', <<"Update failed">>},
|
{'UPDATE_FAILED', <<"Update failed">>},
|
||||||
{'REST_FAILED', <<"Reset source or config failed">>},
|
{'REST_FAILED', <<"Reset source or config failed">>},
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-behaviour(emqx_config_handler).
|
-behaviour(emqx_config_handler).
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-export([post_config_update/5]).
|
-export([post_config_update/5]).
|
||||||
|
|
||||||
|
@ -46,6 +47,30 @@
|
||||||
%% exported for `emqx_telemetry'
|
%% exported for `emqx_telemetry'
|
||||||
-export([get_basic_usage_info/0]).
|
-export([get_basic_usage_info/0]).
|
||||||
|
|
||||||
|
load() ->
|
||||||
|
%% set wait_for_resource_ready => 0 to start resources async
|
||||||
|
Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0},
|
||||||
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Type, NamedConf}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Name, Conf}) ->
|
||||||
|
_Res = emqx_bridge_resource:create(Type, Name, Conf, Opts),
|
||||||
|
?tp(
|
||||||
|
emqx_bridge_loaded,
|
||||||
|
#{
|
||||||
|
type => Type,
|
||||||
|
name => Name,
|
||||||
|
res => _Res
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(NamedConf)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridges)
|
||||||
|
).
|
||||||
|
|
||||||
load_hook() ->
|
load_hook() ->
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
load_hook(Bridges).
|
load_hook(Bridges).
|
||||||
|
@ -138,10 +163,6 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
|
||||||
ok = load_hook(NewConf),
|
ok = load_hook(NewConf),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
load() ->
|
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
|
||||||
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
||||||
|
|
||||||
list() ->
|
list() ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({Type, NameAndConf}, Bridges) ->
|
fun({Type, NameAndConf}, Bridges) ->
|
||||||
|
|
|
@ -321,7 +321,8 @@ schema("/bridges/:id") ->
|
||||||
parameters => [param_path_id()],
|
parameters => [param_path_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"Bridge deleted">>,
|
204 => <<"Bridge deleted">>,
|
||||||
400 => error_schema(['INVALID_ID'], "Update bridge failed")
|
400 => error_schema(['INVALID_ID'], "Update bridge failed"),
|
||||||
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -352,6 +353,7 @@ schema("/bridges/:id/operation/:operation") ->
|
||||||
],
|
],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => <<"Operation success">>,
|
200 => <<"Operation success">>,
|
||||||
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"),
|
||||||
400 => error_schema('INVALID_ID', "Bad bridge ID")
|
400 => error_schema('INVALID_ID', "Bad bridge ID")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -371,7 +373,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => <<"Operation success">>,
|
200 => <<"Operation success">>,
|
||||||
400 => error_schema('INVALID_ID', "Bad bridge ID"),
|
400 => error_schema('INVALID_ID', "Bad bridge ID"),
|
||||||
403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation")
|
403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
|
||||||
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
@ -417,6 +420,7 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
Id,
|
Id,
|
||||||
case emqx_bridge:remove(BridgeType, BridgeName) of
|
case emqx_bridge:remove(BridgeType, BridgeName) of
|
||||||
{ok, _} -> {204};
|
{ok, _} -> {204};
|
||||||
|
{error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
{error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
|
{error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
@ -466,6 +470,10 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
{200};
|
{200};
|
||||||
{error, {pre_config_update, _, bridge_not_found}} ->
|
{error, {pre_config_update, _, bridge_not_found}} ->
|
||||||
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
||||||
|
{error, {_, _, timeout}} ->
|
||||||
|
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
|
{error, timeout} ->
|
||||||
|
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||||
end;
|
end;
|
||||||
|
@ -489,11 +497,18 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]),
|
ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]),
|
||||||
case maps:get(enable, ConfMap, false) of
|
case maps:get(enable, ConfMap, false) of
|
||||||
false ->
|
false ->
|
||||||
{403, error_msg('FORBIDDEN_REQUEST', <<"forbidden operation">>)};
|
{403,
|
||||||
|
error_msg(
|
||||||
|
'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">>
|
||||||
|
)};
|
||||||
true ->
|
true ->
|
||||||
case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of
|
case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of
|
||||||
ok -> {200};
|
ok ->
|
||||||
{error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
|
{200};
|
||||||
|
{error, timeout} ->
|
||||||
|
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -518,6 +533,8 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
|
||||||
case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
|
case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{200};
|
{200};
|
||||||
|
{error, [timeout | _]} ->
|
||||||
|
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
{error, ErrL} ->
|
{error, ErrL} ->
|
||||||
{500, error_msg('INTERNAL_ERROR', ErrL)}
|
{500, error_msg('INTERNAL_ERROR', ErrL)}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -1,90 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% This process monitors all the data bridges, and try to restart a bridge
|
|
||||||
%% when one of it stopped.
|
|
||||||
-module(emqx_bridge_monitor).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
||||||
|
|
||||||
%% API functions
|
|
||||||
-export([
|
|
||||||
start_link/0,
|
|
||||||
ensure_all_started/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([
|
|
||||||
init/1,
|
|
||||||
handle_call/3,
|
|
||||||
handle_cast/2,
|
|
||||||
handle_info/2,
|
|
||||||
terminate/2,
|
|
||||||
code_change/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-record(state, {}).
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
ensure_all_started(Configs) ->
|
|
||||||
gen_server:cast(?MODULE, {start_and_monitor, Configs}).
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
|
||||||
Reply = ok,
|
|
||||||
{reply, Reply, State}.
|
|
||||||
|
|
||||||
handle_cast({start_and_monitor, Configs}, State) ->
|
|
||||||
ok = load_bridges(Configs),
|
|
||||||
{noreply, State};
|
|
||||||
handle_cast(_Msg, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%============================================================================
|
|
||||||
load_bridges(Configs) ->
|
|
||||||
lists:foreach(
|
|
||||||
fun({Type, NamedConf}) ->
|
|
||||||
lists:foreach(
|
|
||||||
fun({Name, Conf}) ->
|
|
||||||
_Res = emqx_bridge_resource:create(Type, Name, Conf),
|
|
||||||
?tp(
|
|
||||||
emqx_bridge_monitor_loaded_bridge,
|
|
||||||
#{
|
|
||||||
type => Type,
|
|
||||||
name => Name,
|
|
||||||
res => _Res
|
|
||||||
}
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
maps:to_list(NamedConf)
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
maps:to_list(Configs)
|
|
||||||
).
|
|
|
@ -28,6 +28,7 @@
|
||||||
-export([
|
-export([
|
||||||
create/2,
|
create/2,
|
||||||
create/3,
|
create/3,
|
||||||
|
create/4,
|
||||||
recreate/2,
|
recreate/2,
|
||||||
recreate/3,
|
recreate/3,
|
||||||
create_dry_run/2,
|
create_dry_run/2,
|
||||||
|
@ -79,6 +80,9 @@ create(BridgeId, Conf) ->
|
||||||
create(BridgeType, BridgeName, Conf).
|
create(BridgeType, BridgeName, Conf).
|
||||||
|
|
||||||
create(Type, Name, Conf) ->
|
create(Type, Name, Conf) ->
|
||||||
|
create(Type, Name, Conf, #{auto_retry_interval => 60000}).
|
||||||
|
|
||||||
|
create(Type, Name, Conf, Opts) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "create bridge",
|
msg => "create bridge",
|
||||||
type => Type,
|
type => Type,
|
||||||
|
@ -90,7 +94,7 @@ create(Type, Name, Conf) ->
|
||||||
<<"emqx_bridge">>,
|
<<"emqx_bridge">>,
|
||||||
bridge_to_resource_type(Type),
|
bridge_to_resource_type(Type),
|
||||||
parse_confs(Type, Name, Conf),
|
parse_confs(Type, Name, Conf),
|
||||||
#{auto_retry_interval => 60000}
|
Opts
|
||||||
),
|
),
|
||||||
maybe_disable_bridge(Type, Name, Conf).
|
maybe_disable_bridge(Type, Name, Conf).
|
||||||
|
|
||||||
|
@ -132,10 +136,14 @@ update(Type, Name, {OldConf, Conf}) ->
|
||||||
true ->
|
true ->
|
||||||
%% we don't need to recreate the bridge if this config change is only to
|
%% we don't need to recreate the bridge if this config change is only to
|
||||||
%% toggole the config 'bridge.{type}.{name}.enable'
|
%% toggole the config 'bridge.{type}.{name}.enable'
|
||||||
|
_ =
|
||||||
case maps:get(enable, Conf, true) of
|
case maps:get(enable, Conf, true) of
|
||||||
true -> restart(Type, Name);
|
true ->
|
||||||
false -> stop(Type, Name)
|
restart(Type, Name);
|
||||||
end
|
false ->
|
||||||
|
stop(Type, Name)
|
||||||
|
end,
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
recreate(Type, Name) ->
|
recreate(Type, Name) ->
|
||||||
|
|
|
@ -32,15 +32,7 @@ init([]) ->
|
||||||
intensity => 10,
|
intensity => 10,
|
||||||
period => 10
|
period => 10
|
||||||
},
|
},
|
||||||
ChildSpecs = [
|
ChildSpecs = [],
|
||||||
#{
|
|
||||||
id => emqx_bridge_monitor,
|
|
||||||
start => {emqx_bridge_monitor, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_bridge_monitor]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|
|
@ -147,7 +147,7 @@ setup_fake_telemetry_data() ->
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts),
|
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts),
|
||||||
|
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
|
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
|
||||||
NEvents = 3,
|
NEvents = 3,
|
||||||
BackInTime = 0,
|
BackInTime = 0,
|
||||||
Timeout = 11_000,
|
Timeout = 11_000,
|
||||||
|
|
|
@ -378,7 +378,10 @@ t_enable_disable_bridges(_) ->
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
|
{ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
|
||||||
|
|
||||||
{ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
|
{ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
|
||||||
?assertEqual(<<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation\"}">>, Res),
|
?assertEqual(
|
||||||
|
<<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation: bridge disabled\"}">>,
|
||||||
|
Res
|
||||||
|
),
|
||||||
|
|
||||||
%% enable a stopped bridge
|
%% enable a stopped bridge
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
|
{ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
|
||||||
|
|
|
@ -573,6 +573,7 @@ obfuscate(Map) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
is_sensitive(password) -> true;
|
is_sensitive(password) -> true;
|
||||||
|
is_sensitive(ssl_opts) -> true;
|
||||||
is_sensitive(_) -> false.
|
is_sensitive(_) -> false.
|
||||||
|
|
||||||
str(A) when is_atom(A) ->
|
str(A) when is_atom(A) ->
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
-type create_opts() :: #{
|
-type create_opts() :: #{
|
||||||
health_check_interval => integer(),
|
health_check_interval => integer(),
|
||||||
health_check_timeout => integer(),
|
health_check_timeout => integer(),
|
||||||
waiting_connect_complete => integer(),
|
wait_for_resource_ready => integer(),
|
||||||
auto_retry_interval => integer()
|
auto_retry_interval => integer()
|
||||||
}.
|
}.
|
||||||
-type after_query() ::
|
-type after_query() ::
|
||||||
|
|
|
@ -260,8 +260,10 @@ query(InstId, Request, AfterQuery) ->
|
||||||
emqx_metrics_worker:inc(resource_metrics, InstId, exception),
|
emqx_metrics_worker:inc(resource_metrics, InstId, exception),
|
||||||
erlang:raise(Err, Reason, ST)
|
erlang:raise(Err, Reason, ST)
|
||||||
end;
|
end;
|
||||||
|
{ok, _Group, _Data} ->
|
||||||
|
query_error(not_found, <<"resource not connected">>);
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
query_error(not_found, <<"resource not found or not connected">>)
|
query_error(not_found, <<"resource not found">>)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
|
|
|
@ -23,19 +23,24 @@
|
||||||
% API
|
% API
|
||||||
-export([
|
-export([
|
||||||
ensure_resource/5,
|
ensure_resource/5,
|
||||||
create_dry_run/2,
|
create/5,
|
||||||
ets_lookup/1,
|
|
||||||
get_metrics/1,
|
|
||||||
health_check/1,
|
|
||||||
list_all/0,
|
|
||||||
list_group/1,
|
|
||||||
lookup/1,
|
|
||||||
recreate/4,
|
recreate/4,
|
||||||
remove/1,
|
remove/1,
|
||||||
reset_metrics/1,
|
create_dry_run/2,
|
||||||
restart/2,
|
restart/2,
|
||||||
set_resource_status_connecting/1,
|
start/2,
|
||||||
stop/1
|
stop/1,
|
||||||
|
health_check/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
lookup/1,
|
||||||
|
list_all/0,
|
||||||
|
list_group/1,
|
||||||
|
ets_lookup/1,
|
||||||
|
get_metrics/1,
|
||||||
|
reset_metrics/1,
|
||||||
|
set_resource_status_connecting/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
% Server
|
% Server
|
||||||
|
@ -51,6 +56,8 @@
|
||||||
-define(HEALTHCHECK_INTERVAL, 15000).
|
-define(HEALTHCHECK_INTERVAL, 15000).
|
||||||
-define(ETS_TABLE, emqx_resource_manager).
|
-define(ETS_TABLE, emqx_resource_manager).
|
||||||
-define(WAIT_FOR_RESOURCE_DELAY, 100).
|
-define(WAIT_FOR_RESOURCE_DELAY, 100).
|
||||||
|
-define(T_OPERATION, 5000).
|
||||||
|
-define(T_LOOKUP, 1000).
|
||||||
|
|
||||||
-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected).
|
-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected).
|
||||||
|
|
||||||
|
@ -74,11 +81,24 @@ ensure_resource(InstId, Group, ResourceType, Config, Opts) ->
|
||||||
{ok, _Group, Data} ->
|
{ok, _Group, Data} ->
|
||||||
{ok, Data};
|
{ok, Data};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
do_start(InstId, Group, ResourceType, Config, Opts),
|
create(InstId, Group, ResourceType, Config, Opts),
|
||||||
{ok, _Group, Data} = lookup(InstId),
|
{ok, _Group, Data} = lookup(InstId),
|
||||||
{ok, Data}
|
{ok, Data}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Create a resource_manager and wait until it is running
|
||||||
|
create(InstId, Group, ResourceType, Config, Opts) ->
|
||||||
|
% The state machine will make the actual call to the callback/resource module after init
|
||||||
|
ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
|
||||||
|
ok = emqx_metrics_worker:create_metrics(
|
||||||
|
resource_metrics,
|
||||||
|
InstId,
|
||||||
|
[matched, success, failed, exception],
|
||||||
|
[matched]
|
||||||
|
),
|
||||||
|
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
|
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
|
||||||
%%
|
%%
|
||||||
%% Triggers the `emqx_resource_manager_sup` supervisor to actually create
|
%% Triggers the `emqx_resource_manager_sup` supervisor to actually create
|
||||||
|
@ -90,9 +110,9 @@ create_dry_run(ResourceType, Config) ->
|
||||||
ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
|
ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
|
||||||
case wait_for_resource_ready(InstId, 5000) of
|
case wait_for_resource_ready(InstId, 5000) of
|
||||||
ok ->
|
ok ->
|
||||||
_ = stop(InstId);
|
_ = remove(InstId);
|
||||||
timeout ->
|
timeout ->
|
||||||
_ = stop(InstId),
|
_ = remove(InstId),
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -118,27 +138,36 @@ remove(InstId) when is_binary(InstId) ->
|
||||||
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
|
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
|
||||||
-spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
|
-spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
|
||||||
remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||||
safe_call(InstId, {remove, ClearMetrics}).
|
safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION).
|
||||||
|
|
||||||
%% @doc Stops and then starts an instance that was already running
|
%% @doc Stops and then starts an instance that was already running
|
||||||
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
|
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
|
||||||
restart(InstId, Opts) when is_binary(InstId) ->
|
restart(InstId, Opts) when is_binary(InstId) ->
|
||||||
case lookup(InstId) of
|
case safe_call(InstId, restart, ?T_OPERATION) of
|
||||||
{ok, Group, #{mod := ResourceType, config := Config} = _Data} ->
|
ok ->
|
||||||
_ = remove(InstId, false),
|
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
||||||
do_start(InstId, Group, ResourceType, Config, Opts);
|
ok;
|
||||||
Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Stop the resource manager process
|
%% @doc Stop the resource
|
||||||
|
-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
|
||||||
|
start(InstId, Opts) ->
|
||||||
|
case safe_call(InstId, start, ?T_OPERATION) of
|
||||||
|
ok ->
|
||||||
|
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
||||||
|
ok;
|
||||||
|
{error, _Reason} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Start the resource
|
||||||
-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
|
-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
|
||||||
stop(InstId) ->
|
stop(InstId) ->
|
||||||
case safe_call(InstId, stop) of
|
case safe_call(InstId, stop, ?T_OPERATION) of
|
||||||
ok ->
|
ok ->
|
||||||
ok;
|
ok;
|
||||||
{error, not_found} ->
|
|
||||||
ok;
|
|
||||||
{error, _Reason} = Error ->
|
{error, _Reason} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
@ -146,12 +175,15 @@ stop(InstId) ->
|
||||||
%% @doc Test helper
|
%% @doc Test helper
|
||||||
-spec set_resource_status_connecting(instance_id()) -> ok.
|
-spec set_resource_status_connecting(instance_id()) -> ok.
|
||||||
set_resource_status_connecting(InstId) ->
|
set_resource_status_connecting(InstId) ->
|
||||||
safe_call(InstId, set_resource_status_connecting).
|
safe_call(InstId, set_resource_status_connecting, infinity).
|
||||||
|
|
||||||
%% @doc Lookup the group and data of a resource
|
%% @doc Lookup the group and data of a resource
|
||||||
-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
|
-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
|
||||||
lookup(InstId) ->
|
lookup(InstId) ->
|
||||||
safe_call(InstId, lookup).
|
case safe_call(InstId, lookup, ?T_LOOKUP) of
|
||||||
|
{error, timeout} -> ets_lookup(InstId);
|
||||||
|
Result -> Result
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Lookup the group and data of a resource
|
%% @doc Lookup the group and data of a resource
|
||||||
-spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
|
-spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
|
||||||
|
@ -192,7 +224,7 @@ list_group(Group) ->
|
||||||
|
|
||||||
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
|
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
|
||||||
health_check(InstId) ->
|
health_check(InstId) ->
|
||||||
safe_call(InstId, health_check).
|
safe_call(InstId, health_check, ?T_OPERATION).
|
||||||
|
|
||||||
%% Server start/stop callbacks
|
%% Server start/stop callbacks
|
||||||
|
|
||||||
|
@ -204,16 +236,20 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
|
||||||
mod = ResourceType,
|
mod = ResourceType,
|
||||||
config = Config,
|
config = Config,
|
||||||
opts = Opts,
|
opts = Opts,
|
||||||
status = undefined,
|
status = connecting,
|
||||||
state = undefined,
|
state = undefined,
|
||||||
error = undefined
|
error = undefined
|
||||||
},
|
},
|
||||||
gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
|
gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
|
||||||
|
|
||||||
init(Data) ->
|
init(Data) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
%% init the cache so that lookup/1 will always return something
|
||||||
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
||||||
{ok, connecting, Data, {next_event, internal, try_connect}}.
|
{ok, connecting, Data, {next_event, internal, try_connect}}.
|
||||||
|
|
||||||
terminate(_Reason, _State, Data) ->
|
terminate(_Reason, _State, Data) ->
|
||||||
|
_ = maybe_clear_alarm(Data#data.id),
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
ets:delete(?ETS_TABLE, Data#data.id),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -226,31 +262,46 @@ callback_mode() -> [handle_event_function, state_enter].
|
||||||
% Called during testing to force a specific state
|
% Called during testing to force a specific state
|
||||||
handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
|
handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
|
||||||
{next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
|
{next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
|
||||||
|
% Called when the resource is to be restarted
|
||||||
|
handle_event({call, From}, restart, _State, Data) ->
|
||||||
|
_ = stop_resource(Data),
|
||||||
|
start_resource(Data, From);
|
||||||
|
% Called when the resource is to be started
|
||||||
|
handle_event({call, From}, start, stopped, Data) ->
|
||||||
|
start_resource(Data, From);
|
||||||
|
handle_event({call, From}, start, _State, _Data) ->
|
||||||
|
{keep_state_and_data, [{reply, From, ok}]};
|
||||||
% Called when the resource is to be stopped
|
% Called when the resource is to be stopped
|
||||||
handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) ->
|
handle_event({call, From}, stop, stopped, _Data) ->
|
||||||
{next_state, stopped, Data, [{reply, From, ok}]};
|
{keep_state_and_data, [{reply, From, ok}]};
|
||||||
handle_event({call, From}, stop, _State, Data) ->
|
handle_event({call, From}, stop, _State, Data) ->
|
||||||
Result = do_stop(Data),
|
Result = stop_resource(Data),
|
||||||
UpdatedData = Data#data{status = disconnected},
|
UpdatedData = Data#data{status = disconnected},
|
||||||
{next_state, stopped, UpdatedData, [{reply, From, Result}]};
|
{next_state, stopped, UpdatedData, [{reply, From, Result}]};
|
||||||
% Called when a resource is to be stopped and removed.
|
% Called when a resource is to be stopped and removed.
|
||||||
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
|
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
|
||||||
handle_remove_event(From, ClearMetrics, Data);
|
handle_remove_event(From, ClearMetrics, Data);
|
||||||
% Called when the state of the resource is being looked up.
|
% Called when the state-data of the resource is being looked up.
|
||||||
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
|
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
|
||||||
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
|
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
|
||||||
{keep_state_and_data, [{reply, From, Reply}]};
|
{keep_state_and_data, [{reply, From, Reply}]};
|
||||||
% Connecting state enter
|
% Called when doing a manually health check.
|
||||||
handle_event(internal, try_connect, connecting, Data) ->
|
handle_event({call, From}, health_check, stopped, _Data) ->
|
||||||
handle_connection_attempt(Data);
|
Actions = [{reply, From, {error, resource_is_stopped}}],
|
||||||
|
{keep_state_and_data, Actions};
|
||||||
|
handle_event({call, From}, health_check, _State, Data) ->
|
||||||
|
handle_manually_health_check(From, Data);
|
||||||
|
% State: CONNECTING
|
||||||
handle_event(enter, _OldState, connecting, Data) ->
|
handle_event(enter, _OldState, connecting, Data) ->
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
||||||
Actions = [{state_timeout, 0, health_check}],
|
Actions = [{state_timeout, 0, health_check}],
|
||||||
{next_state, connecting, Data, Actions};
|
{keep_state_and_data, Actions};
|
||||||
% Connecting state health_check timeouts.
|
handle_event(internal, try_connect, connecting, Data) ->
|
||||||
|
start_resource(Data, undefined);
|
||||||
handle_event(state_timeout, health_check, connecting, Data) ->
|
handle_event(state_timeout, health_check, connecting, Data) ->
|
||||||
connecting_health_check(Data);
|
handle_connecting_health_check(Data);
|
||||||
%% The connected state is entered after a successful start of the callback mod
|
%% State: CONNECTED
|
||||||
|
%% The connected state is entered after a successful on_start/2 of the callback mod
|
||||||
%% and successful health_checks
|
%% and successful health_checks
|
||||||
handle_event(enter, _OldState, connected, Data) ->
|
handle_event(enter, _OldState, connected, Data) ->
|
||||||
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
||||||
|
@ -258,21 +309,19 @@ handle_event(enter, _OldState, connected, Data) ->
|
||||||
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
|
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
|
||||||
{next_state, connected, Data, Actions};
|
{next_state, connected, Data, Actions};
|
||||||
handle_event(state_timeout, health_check, connected, Data) ->
|
handle_event(state_timeout, health_check, connected, Data) ->
|
||||||
perform_connected_health_check(Data);
|
handle_connected_health_check(Data);
|
||||||
|
%% State: DISCONNECTED
|
||||||
handle_event(enter, _OldState, disconnected, Data) ->
|
handle_event(enter, _OldState, disconnected, Data) ->
|
||||||
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
||||||
handle_disconnected_state_enter(Data);
|
handle_disconnected_state_enter(Data);
|
||||||
handle_event(state_timeout, auto_retry, disconnected, Data) ->
|
handle_event(state_timeout, auto_retry, disconnected, Data) ->
|
||||||
handle_connection_attempt(Data);
|
start_resource(Data, undefined);
|
||||||
|
%% State: STOPPED
|
||||||
|
%% The stopped state is entered after the resource has been explicitly stopped
|
||||||
handle_event(enter, _OldState, stopped, Data) ->
|
handle_event(enter, _OldState, stopped, Data) ->
|
||||||
UpdatedData = Data#data{status = disconnected},
|
UpdatedData = Data#data{status = disconnected},
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
|
||||||
{next_state, stopped, UpdatedData};
|
{next_state, stopped, UpdatedData};
|
||||||
% Resource has been explicitly stopped, so return that as the error reason.
|
|
||||||
handle_event({call, From}, _, stopped, _Data) ->
|
|
||||||
Actions = [{reply, From, {error, resource_is_stopped}}],
|
|
||||||
{keep_state_and_data, Actions};
|
|
||||||
handle_event({call, From}, health_check, _State, Data) ->
|
|
||||||
handle_health_check_request(From, Data);
|
|
||||||
% Ignore all other events
|
% Ignore all other events
|
||||||
handle_event(EventType, EventData, State, Data) ->
|
handle_event(EventType, EventData, State, Data) ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
|
@ -285,77 +334,70 @@ handle_event(EventType, EventData, State, Data) ->
|
||||||
data => Data
|
data => Data
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{next_state, State, Data}.
|
keep_state_and_data.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% internal functions
|
%% internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
handle_disconnected_state_enter(Data) ->
|
handle_disconnected_state_enter(Data) ->
|
||||||
UpdatedData = Data#data{status = disconnected},
|
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
|
||||||
case maps:get(auto_retry_interval, Data#data.opts, undefined) of
|
case maps:get(auto_retry_interval, Data#data.opts, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{next_state, disconnected, UpdatedData};
|
{next_state, disconnected, Data};
|
||||||
RetryInterval ->
|
RetryInterval ->
|
||||||
Actions = [{state_timeout, RetryInterval, auto_retry}],
|
Actions = [{state_timeout, RetryInterval, auto_retry}],
|
||||||
{next_state, disconnected, UpdatedData, Actions}
|
{next_state, disconnected, Data, Actions}
|
||||||
end.
|
|
||||||
|
|
||||||
handle_connection_attempt(Data) ->
|
|
||||||
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
|
|
||||||
{ok, ResourceState} ->
|
|
||||||
UpdatedData = Data#data{state = ResourceState, status = connecting},
|
|
||||||
%% Perform an initial health_check immediately before transitioning into a connected state
|
|
||||||
Actions = [{state_timeout, 0, health_check}],
|
|
||||||
{next_state, connecting, UpdatedData, Actions};
|
|
||||||
{error, Reason} ->
|
|
||||||
%% Keep track of the error reason why the connection did not work
|
|
||||||
%% so that the Reason can be returned when the verification call is made.
|
|
||||||
UpdatedData = Data#data{status = disconnected, error = Reason},
|
|
||||||
{next_state, disconnected, UpdatedData}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_remove_event(From, ClearMetrics, Data) ->
|
handle_remove_event(From, ClearMetrics, Data) ->
|
||||||
do_stop(Data),
|
stop_resource(Data),
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
|
||||||
case ClearMetrics of
|
case ClearMetrics of
|
||||||
true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
|
true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
|
||||||
false -> ok
|
false -> ok
|
||||||
end,
|
end,
|
||||||
{stop_and_reply, normal, [{reply, From, ok}]}.
|
{stop_and_reply, normal, [{reply, From, ok}]}.
|
||||||
|
|
||||||
do_start(InstId, Group, ResourceType, Config, Opts) ->
|
start_resource(Data, From) ->
|
||||||
% The state machine will make the actual call to the callback/resource module after init
|
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
|
||||||
ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
|
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
|
||||||
ok = emqx_metrics_worker:create_metrics(
|
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
|
||||||
resource_metrics,
|
{ok, ResourceState} ->
|
||||||
InstId,
|
UpdatedData = Data#data{state = ResourceState, status = connecting},
|
||||||
[matched, success, failed, exception],
|
%% Perform an initial health_check immediately before transitioning into a connected state
|
||||||
[matched]
|
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
|
||||||
),
|
{next_state, connecting, UpdatedData, Actions};
|
||||||
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
|
{error, Reason} = Err ->
|
||||||
ok.
|
_ = maybe_alarm(disconnected, Data#data.id),
|
||||||
|
%% Keep track of the error reason why the connection did not work
|
||||||
|
%% so that the Reason can be returned when the verification call is made.
|
||||||
|
UpdatedData = Data#data{status = disconnected, error = Reason},
|
||||||
|
Actions = maybe_reply([], From, Err),
|
||||||
|
{next_state, disconnected, UpdatedData, Actions}
|
||||||
|
end.
|
||||||
|
|
||||||
do_stop(#data{state = undefined} = _Data) ->
|
stop_resource(#data{state = undefined, id = ResId} = _Data) ->
|
||||||
|
_ = maybe_clear_alarm(ResId),
|
||||||
ok;
|
ok;
|
||||||
do_stop(Data) ->
|
stop_resource(Data) ->
|
||||||
Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
|
%% We don't care the return value of the Mod:on_stop/2.
|
||||||
ets:delete(?ETS_TABLE, Data#data.id),
|
%% The callback mod should make sure the resource is stopped after on_stop/2
|
||||||
Result.
|
%% is returned.
|
||||||
|
_ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
|
||||||
|
_ = maybe_clear_alarm(Data#data.id),
|
||||||
|
ok.
|
||||||
|
|
||||||
proc_name(Id) ->
|
proc_name(Id) ->
|
||||||
Module = atom_to_binary(?MODULE),
|
Module = atom_to_binary(?MODULE),
|
||||||
Connector = <<"_">>,
|
Connector = <<"_">>,
|
||||||
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
|
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
|
||||||
|
|
||||||
handle_health_check_request(From, Data) ->
|
handle_manually_health_check(From, Data) ->
|
||||||
with_health_check(Data, fun(Status, UpdatedData) ->
|
with_health_check(Data, fun(Status, UpdatedData) ->
|
||||||
Actions = [{reply, From, {ok, Status}}],
|
Actions = [{reply, From, {ok, Status}}],
|
||||||
{next_state, Status, UpdatedData, Actions}
|
{next_state, Status, UpdatedData, Actions}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
connecting_health_check(Data) ->
|
handle_connecting_health_check(Data) ->
|
||||||
with_health_check(
|
with_health_check(
|
||||||
Data,
|
Data,
|
||||||
fun
|
fun
|
||||||
|
@ -369,7 +411,7 @@ connecting_health_check(Data) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
perform_connected_health_check(Data) ->
|
handle_connected_health_check(Data) ->
|
||||||
with_health_check(
|
with_health_check(
|
||||||
Data,
|
Data,
|
||||||
fun
|
fun
|
||||||
|
@ -386,21 +428,28 @@ with_health_check(Data, Func) ->
|
||||||
ResId = Data#data.id,
|
ResId = Data#data.id,
|
||||||
HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
|
HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
|
||||||
{Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
|
{Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
|
||||||
_ =
|
_ = maybe_alarm(Status, ResId),
|
||||||
case Status of
|
UpdatedData = Data#data{
|
||||||
connected ->
|
state = NewState, status = Status, error = Err
|
||||||
|
},
|
||||||
|
ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}),
|
||||||
|
Func(Status, UpdatedData).
|
||||||
|
|
||||||
|
maybe_alarm(connected, _ResId) ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->
|
||||||
|
ok;
|
||||||
|
maybe_alarm(_Status, ResId) ->
|
||||||
emqx_alarm:activate(
|
emqx_alarm:activate(
|
||||||
ResId,
|
ResId,
|
||||||
#{resource_id => ResId, reason => resource_down},
|
#{resource_id => ResId, reason => resource_down},
|
||||||
<<"resource down: ", ResId/binary>>
|
<<"resource down: ", ResId/binary>>
|
||||||
)
|
).
|
||||||
end,
|
|
||||||
UpdatedData = Data#data{
|
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
|
||||||
state = NewState, status = Status, error = Err
|
ok;
|
||||||
},
|
maybe_clear_alarm(ResId) ->
|
||||||
Func(Status, UpdatedData).
|
emqx_alarm:deactivate(ResId).
|
||||||
|
|
||||||
parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) ->
|
parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) ->
|
||||||
{Status, OldState, undefined};
|
{Status, OldState, undefined};
|
||||||
|
@ -409,6 +458,11 @@ parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status)
|
||||||
parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
|
parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
|
||||||
{Status, NewState, Error}.
|
{Status, NewState, Error}.
|
||||||
|
|
||||||
|
maybe_reply(Actions, undefined, _Reply) ->
|
||||||
|
Actions;
|
||||||
|
maybe_reply(Actions, From, Reply) ->
|
||||||
|
[{reply, From, Reply} | Actions].
|
||||||
|
|
||||||
data_record_to_external_map_with_metrics(Data) ->
|
data_record_to_external_map_with_metrics(Data) ->
|
||||||
#{
|
#{
|
||||||
id => Data#data.id,
|
id => Data#data.id,
|
||||||
|
@ -438,10 +492,12 @@ do_wait_for_resource_ready(InstId, Retry) ->
|
||||||
do_wait_for_resource_ready(InstId, Retry - 1)
|
do_wait_for_resource_ready(InstId, Retry - 1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
safe_call(InstId, Message) ->
|
safe_call(InstId, Message, Timeout) ->
|
||||||
try
|
try
|
||||||
gen_statem:call(proc_name(InstId), Message)
|
gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout})
|
||||||
catch
|
catch
|
||||||
exit:_ ->
|
exit:{R, _} when R == noproc; R == normal; R == shutdown ->
|
||||||
{error, not_found}
|
{error, not_found};
|
||||||
|
exit:{timeout, _} ->
|
||||||
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include("emqx_resource.hrl").
|
-include("emqx_resource.hrl").
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
-define(TEST_RESOURCE, emqx_test_resource).
|
-define(TEST_RESOURCE, emqx_test_resource).
|
||||||
-define(ID, <<"id">>).
|
-define(ID, <<"id">>).
|
||||||
|
@ -183,7 +184,6 @@ t_healthy(_) ->
|
||||||
emqx_resource:set_resource_status_connecting(?ID),
|
emqx_resource:set_resource_status_connecting(?ID),
|
||||||
|
|
||||||
{ok, connected} = emqx_resource:health_check(?ID),
|
{ok, connected} = emqx_resource:health_check(?ID),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{status := connected}],
|
[#{status := connected}],
|
||||||
emqx_resource:list_instances_verbose()
|
emqx_resource:list_instances_verbose()
|
||||||
|
@ -194,7 +194,7 @@ t_healthy(_) ->
|
||||||
?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
|
?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[],
|
[#{status := connecting}],
|
||||||
emqx_resource:list_instances_verbose()
|
emqx_resource:list_instances_verbose()
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -236,7 +236,6 @@ t_stop_start(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqx_resource:restart(?ID),
|
ok = emqx_resource:restart(?ID),
|
||||||
|
|
||||||
timer:sleep(300),
|
timer:sleep(300),
|
||||||
|
|
||||||
#{pid := Pid1} = emqx_resource:query(?ID, get_state),
|
#{pid := Pid1} = emqx_resource:query(?ID, get_state),
|
||||||
|
@ -334,11 +333,11 @@ t_create_dry_run_local_failed(_) ->
|
||||||
),
|
),
|
||||||
?assertEqual(error, Res2),
|
?assertEqual(error, Res2),
|
||||||
|
|
||||||
{Res3, _} = emqx_resource:create_dry_run_local(
|
Res3 = emqx_resource:create_dry_run_local(
|
||||||
?TEST_RESOURCE,
|
?TEST_RESOURCE,
|
||||||
#{name => test_resource, stop_error => true}
|
#{name => test_resource, stop_error => true}
|
||||||
),
|
),
|
||||||
?assertEqual(error, Res3).
|
?assertEqual(ok, Res3).
|
||||||
|
|
||||||
t_test_func(_) ->
|
t_test_func(_) ->
|
||||||
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
|
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
|
||||||
|
|
Loading…
Reference in New Issue