diff --git a/apps/emqx/include/http_api.hrl b/apps/emqx/include/http_api.hrl index 08c17f274..7cd16d338 100644 --- a/apps/emqx/include/http_api.hrl +++ b/apps/emqx/include/http_api.hrl @@ -49,6 +49,7 @@ %% Internal error -define(INTERNAL_ERROR, 'INTERNAL_ERROR'). +-define(SERVICE_UNAVAILABLE, 'SERVICE_UNAVAILABLE'). -define(SOURCE_ERROR, 'SOURCE_ERROR'). -define(UPDATE_FAILED, 'UPDATE_FAILED'). -define(REST_FAILED, 'REST_FAILED'). @@ -81,6 +82,7 @@ {'TOPIC_NOT_FOUND', <<"Topic not found">>}, {'USER_NOT_FOUND', <<"User not found">>}, {'INTERNAL_ERROR', <<"Server inter error">>}, + {'SERVICE_UNAVAILABLE', <<"Service unavailable">>}, {'SOURCE_ERROR', <<"Source error">>}, {'UPDATE_FAILED', <<"Update failed">>}, {'REST_FAILED', <<"Reset source or config failed">>}, diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0fac31625..61a697948 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -17,6 +17,7 @@ -behaviour(emqx_config_handler). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([post_config_update/5]). @@ -46,6 +47,30 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). +load() -> + %% set wait_for_resource_ready => 0 to start resources async + Opts = #{auto_retry_interval => 60000, wait_for_resource_ready => 0}, + Bridges = emqx:get_config([bridges], #{}), + lists:foreach( + fun({Type, NamedConf}) -> + lists:foreach( + fun({Name, Conf}) -> + _Res = emqx_bridge_resource:create(Type, Name, Conf, Opts), + ?tp( + emqx_bridge_loaded, + #{ + type => Type, + name => Name, + res => _Res + } + ) + end, + maps:to_list(NamedConf) + ) + end, + maps:to_list(Bridges) + ). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). @@ -138,10 +163,6 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> ok = load_hook(NewConf), Result. -load() -> - Bridges = emqx:get_config([bridges], #{}), - emqx_bridge_monitor:ensure_all_started(Bridges). - list() -> lists:foldl( fun({Type, NameAndConf}, Bridges) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8cc45c911..bdbbb787f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -321,7 +321,8 @@ schema("/bridges/:id") -> parameters => [param_path_id()], responses => #{ 204 => <<"Bridge deleted">>, - 400 => error_schema(['INVALID_ID'], "Update bridge failed") + 400 => error_schema(['INVALID_ID'], "Update bridge failed"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }; @@ -352,6 +353,7 @@ schema("/bridges/:id/operation/:operation") -> ], responses => #{ 200 => <<"Operation success">>, + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"), 400 => error_schema('INVALID_ID', "Bad bridge ID") } } @@ -371,7 +373,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> responses => #{ 200 => <<"Operation success">>, 400 => error_schema('INVALID_ID', "Bad bridge ID"), - 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation") + 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } } }. @@ -417,6 +420,7 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> Id, case emqx_bridge:remove(BridgeType, BridgeName) of {ok, _} -> {204}; + {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} end ). @@ -466,6 +470,10 @@ lookup_from_local_node(BridgeType, BridgeName) -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; + {error, {_, _, timeout}} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} end; @@ -489,11 +497,18 @@ lookup_from_local_node(BridgeType, BridgeName) -> ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), case maps:get(enable, ConfMap, false) of false -> - {403, error_msg('FORBIDDEN_REQUEST', <<"forbidden operation">>)}; + {403, + error_msg( + 'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">> + )}; true -> case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of - ok -> {200}; - {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} + ok -> + {200}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end end end @@ -518,6 +533,8 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of {ok, _} -> {200}; + {error, [timeout | _]} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, ErrL} -> {500, error_msg('INTERNAL_ERROR', ErrL)} end. diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl deleted file mode 100644 index 70c89f352..000000000 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- -%% This process monitors all the data bridges, and try to restart a bridge -%% when one of it stopped. --module(emqx_bridge_monitor). - --behaviour(gen_server). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -%% API functions --export([ - start_link/0, - ensure_all_started/1 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -ensure_all_started(Configs) -> - gen_server:cast(?MODULE, {start_and_monitor, Configs}). - -init([]) -> - {ok, #state{}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast({start_and_monitor, Configs}, State) -> - ok = load_bridges(Configs), - {noreply, State}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%============================================================================ -load_bridges(Configs) -> - lists:foreach( - fun({Type, NamedConf}) -> - lists:foreach( - fun({Name, Conf}) -> - _Res = emqx_bridge_resource:create(Type, Name, Conf), - ?tp( - emqx_bridge_monitor_loaded_bridge, - #{ - type => Type, - name => Name, - res => _Res - } - ) - end, - maps:to_list(NamedConf) - ) - end, - maps:to_list(Configs) - ). diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 5af164b33..186f99557 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -28,6 +28,7 @@ -export([ create/2, create/3, + create/4, recreate/2, recreate/3, create_dry_run/2, @@ -79,6 +80,9 @@ create(BridgeId, Conf) -> create(BridgeType, BridgeName, Conf). create(Type, Name, Conf) -> + create(Type, Name, Conf, #{auto_retry_interval => 60000}). + +create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ msg => "create bridge", type => Type, @@ -90,7 +94,7 @@ create(Type, Name, Conf) -> <<"emqx_bridge">>, bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{auto_retry_interval => 60000} + Opts ), maybe_disable_bridge(Type, Name, Conf). @@ -132,10 +136,14 @@ update(Type, Name, {OldConf, Conf}) -> true -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' - case maps:get(enable, Conf, true) of - true -> restart(Type, Name); - false -> stop(Type, Name) - end + _ = + case maps:get(enable, Conf, true) of + true -> + restart(Type, Name); + false -> + stop(Type, Name) + end, + ok end. recreate(Type, Name) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_sup.erl b/apps/emqx_bridge/src/emqx_bridge_sup.erl index cce3a066b..ba8181f83 100644 --- a/apps/emqx_bridge/src/emqx_bridge_sup.erl +++ b/apps/emqx_bridge/src/emqx_bridge_sup.erl @@ -32,15 +32,7 @@ init([]) -> intensity => 10, period => 10 }, - ChildSpecs = [ - #{ - id => emqx_bridge_monitor, - start => {emqx_bridge_monitor, start_link, []}, - restart => permanent, - type => worker, - modules => [emqx_bridge_monitor] - } - ], + ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 894322c61..d8266f83a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -147,7 +147,7 @@ setup_fake_telemetry_data() -> ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts), ok = snabbkaffe:start_trace(), - Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end, + Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end, NEvents = 3, BackInTime = 0, Timeout = 11_000, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index c5d8d924c..c048a13fe 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -378,7 +378,10 @@ t_enable_disable_bridges(_) -> {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), {ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), - ?assertEqual(<<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation\"}">>, Res), + ?assertEqual( + <<"{\"code\":\"FORBIDDEN_REQUEST\",\"message\":\"forbidden operation: bridge disabled\"}">>, + Res + ), %% enable a stopped bridge {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 1ff4e7f6a..db795a4cf 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -573,6 +573,7 @@ obfuscate(Map) -> ). is_sensitive(password) -> true; +is_sensitive(ssl_opts) -> true; is_sensitive(_) -> false. str(A) when is_atom(A) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 2b57bc580..d1d82a401 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,7 @@ -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), - waiting_connect_complete => integer(), + wait_for_resource_ready => integer(), auto_retry_interval => integer() }. -type after_query() :: diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f5b03e76b..5e3242135 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -260,8 +260,10 @@ query(InstId, Request, AfterQuery) -> emqx_metrics_worker:inc(resource_metrics, InstId, exception), erlang:raise(Err, Reason, ST) end; + {ok, _Group, _Data} -> + query_error(not_found, <<"resource not connected">>); {error, not_found} -> - query_error(not_found, <<"resource not found or not connected">>) + query_error(not_found, <<"resource not found">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d48c910fa..bcc4422b2 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -23,19 +23,24 @@ % API -export([ ensure_resource/5, - create_dry_run/2, - ets_lookup/1, - get_metrics/1, - health_check/1, - list_all/0, - list_group/1, - lookup/1, + create/5, recreate/4, remove/1, - reset_metrics/1, + create_dry_run/2, restart/2, - set_resource_status_connecting/1, - stop/1 + start/2, + stop/1, + health_check/1 +]). + +-export([ + lookup/1, + list_all/0, + list_group/1, + ets_lookup/1, + get_metrics/1, + reset_metrics/1, + set_resource_status_connecting/1 ]). % Server @@ -51,6 +56,8 @@ -define(HEALTHCHECK_INTERVAL, 15000). -define(ETS_TABLE, emqx_resource_manager). -define(WAIT_FOR_RESOURCE_DELAY, 100). +-define(T_OPERATION, 5000). +-define(T_LOOKUP, 1000). -define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected). @@ -74,11 +81,24 @@ ensure_resource(InstId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - do_start(InstId, Group, ResourceType, Config, Opts), + create(InstId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(InstId), {ok, Data} end. +%% @doc Create a resource_manager and wait until it is running +create(InstId, Group, ResourceType, Config, Opts) -> + % The state machine will make the actual call to the callback/resource module after init + ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), + ok = emqx_metrics_worker:create_metrics( + resource_metrics, + InstId, + [matched, success, failed, exception], + [matched] + ), + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok. + %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create @@ -90,9 +110,9 @@ create_dry_run(ResourceType, Config) -> ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), case wait_for_resource_ready(InstId, 5000) of ok -> - _ = stop(InstId); + _ = remove(InstId); timeout -> - _ = stop(InstId), + _ = remove(InstId), {error, timeout} end. @@ -118,27 +138,36 @@ remove(InstId) when is_binary(InstId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. remove(InstId, ClearMetrics) when is_binary(InstId) -> - safe_call(InstId, {remove, ClearMetrics}). + safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. restart(InstId, Opts) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Group, #{mod := ResourceType, config := Config} = _Data} -> - _ = remove(InstId, false), - do_start(InstId, Group, ResourceType, Config, Opts); - Error -> + case safe_call(InstId, restart, ?T_OPERATION) of + ok -> + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok; + {error, _Reason} = Error -> Error end. -%% @doc Stop the resource manager process +%% @doc Stop the resource +-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(InstId, Opts) -> + case safe_call(InstId, start, ?T_OPERATION) of + ok -> + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok; + {error, _Reason} = Error -> + Error + end. + +%% @doc Start the resource -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> - case safe_call(InstId, stop) of + case safe_call(InstId, stop, ?T_OPERATION) of ok -> ok; - {error, not_found} -> - ok; {error, _Reason} = Error -> Error end. @@ -146,12 +175,15 @@ stop(InstId) -> %% @doc Test helper -spec set_resource_status_connecting(instance_id()) -> ok. set_resource_status_connecting(InstId) -> - safe_call(InstId, set_resource_status_connecting). + safe_call(InstId, set_resource_status_connecting, infinity). %% @doc Lookup the group and data of a resource -spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(InstId) -> - safe_call(InstId, lookup). + case safe_call(InstId, lookup, ?T_LOOKUP) of + {error, timeout} -> ets_lookup(InstId); + Result -> Result + end. %% @doc Lookup the group and data of a resource -spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. @@ -192,7 +224,7 @@ list_group(Group) -> -spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. health_check(InstId) -> - safe_call(InstId, health_check). + safe_call(InstId, health_check, ?T_OPERATION). %% Server start/stop callbacks @@ -204,16 +236,20 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> mod = ResourceType, config = Config, opts = Opts, - status = undefined, + status = connecting, state = undefined, error = undefined }, gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). init(Data) -> + process_flag(trap_exit, true), + %% init the cache so that lookup/1 will always return something + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), {ok, connecting, Data, {next_event, internal, try_connect}}. terminate(_Reason, _State, Data) -> + _ = maybe_clear_alarm(Data#data.id), ets:delete(?ETS_TABLE, Data#data.id), ok. @@ -226,31 +262,46 @@ callback_mode() -> [handle_event_function, state_enter]. % Called during testing to force a specific state handle_event({call, From}, set_resource_status_connecting, _State, Data) -> {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; +% Called when the resource is to be restarted +handle_event({call, From}, restart, _State, Data) -> + _ = stop_resource(Data), + start_resource(Data, From); +% Called when the resource is to be started +handle_event({call, From}, start, stopped, Data) -> + start_resource(Data, From); +handle_event({call, From}, start, _State, _Data) -> + {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource is to be stopped -handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) -> - {next_state, stopped, Data, [{reply, From, ok}]}; +handle_event({call, From}, stop, stopped, _Data) -> + {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> - Result = do_stop(Data), + Result = stop_resource(Data), UpdatedData = Data#data{status = disconnected}, {next_state, stopped, UpdatedData, [{reply, From, Result}]}; % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); -% Called when the state of the resource is being looked up. +% Called when the state-data of the resource is being looked up. handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, {keep_state_and_data, [{reply, From, Reply}]}; -% Connecting state enter -handle_event(internal, try_connect, connecting, Data) -> - handle_connection_attempt(Data); +% Called when doing a manually health check. +handle_event({call, From}, health_check, stopped, _Data) -> + Actions = [{reply, From, {error, resource_is_stopped}}], + {keep_state_and_data, Actions}; +handle_event({call, From}, health_check, _State, Data) -> + handle_manually_health_check(From, Data); +% State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> - ets:delete(?ETS_TABLE, Data#data.id), + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), Actions = [{state_timeout, 0, health_check}], - {next_state, connecting, Data, Actions}; -% Connecting state health_check timeouts. + {keep_state_and_data, Actions}; +handle_event(internal, try_connect, connecting, Data) -> + start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> - connecting_health_check(Data); -%% The connected state is entered after a successful start of the callback mod + handle_connecting_health_check(Data); +%% State: CONNECTED +%% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), @@ -258,21 +309,19 @@ handle_event(enter, _OldState, connected, Data) -> Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], {next_state, connected, Data, Actions}; handle_event(state_timeout, health_check, connected, Data) -> - perform_connected_health_check(Data); + handle_connected_health_check(Data); +%% State: DISCONNECTED handle_event(enter, _OldState, disconnected, Data) -> + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), handle_disconnected_state_enter(Data); handle_event(state_timeout, auto_retry, disconnected, Data) -> - handle_connection_attempt(Data); + start_resource(Data, undefined); +%% State: STOPPED +%% The stopped state is entered after the resource has been explicitly stopped handle_event(enter, _OldState, stopped, Data) -> UpdatedData = Data#data{status = disconnected}, - ets:delete(?ETS_TABLE, Data#data.id), + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), {next_state, stopped, UpdatedData}; -% Resource has been explicitly stopped, so return that as the error reason. -handle_event({call, From}, _, stopped, _Data) -> - Actions = [{reply, From, {error, resource_is_stopped}}], - {keep_state_and_data, Actions}; -handle_event({call, From}, health_check, _State, Data) -> - handle_health_check_request(From, Data); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -285,77 +334,70 @@ handle_event(EventType, EventData, State, Data) -> data => Data } ), - {next_state, State, Data}. + keep_state_and_data. %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ handle_disconnected_state_enter(Data) -> - UpdatedData = Data#data{status = disconnected}, - ets:delete(?ETS_TABLE, Data#data.id), case maps:get(auto_retry_interval, Data#data.opts, undefined) of undefined -> - {next_state, disconnected, UpdatedData}; + {next_state, disconnected, Data}; RetryInterval -> Actions = [{state_timeout, RetryInterval, auto_retry}], - {next_state, disconnected, UpdatedData, Actions} - end. - -handle_connection_attempt(Data) -> - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connecting}, - %% Perform an initial health_check immediately before transitioning into a connected state - Actions = [{state_timeout, 0, health_check}], - {next_state, connecting, UpdatedData, Actions}; - {error, Reason} -> - %% Keep track of the error reason why the connection did not work - %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Reason}, - {next_state, disconnected, UpdatedData} + {next_state, disconnected, Data, Actions} end. handle_remove_event(From, ClearMetrics, Data) -> - do_stop(Data), - ets:delete(?ETS_TABLE, Data#data.id), + stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id); false -> ok end, {stop_and_reply, normal, [{reply, From, ok}]}. -do_start(InstId, Group, ResourceType, Config, Opts) -> - % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), - ok = emqx_metrics_worker:create_metrics( - resource_metrics, - InstId, - [matched, success, failed, exception], - [matched] - ), - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), - ok. +start_resource(Data, From) -> + %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + {ok, ResourceState} -> + UpdatedData = Data#data{state = ResourceState, status = connecting}, + %% Perform an initial health_check immediately before transitioning into a connected state + Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), + {next_state, connecting, UpdatedData, Actions}; + {error, Reason} = Err -> + _ = maybe_alarm(disconnected, Data#data.id), + %% Keep track of the error reason why the connection did not work + %% so that the Reason can be returned when the verification call is made. + UpdatedData = Data#data{status = disconnected, error = Reason}, + Actions = maybe_reply([], From, Err), + {next_state, disconnected, UpdatedData, Actions} + end. -do_stop(#data{state = undefined} = _Data) -> +stop_resource(#data{state = undefined, id = ResId} = _Data) -> + _ = maybe_clear_alarm(ResId), ok; -do_stop(Data) -> - Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), - ets:delete(?ETS_TABLE, Data#data.id), - Result. +stop_resource(Data) -> + %% We don't care the return value of the Mod:on_stop/2. + %% The callback mod should make sure the resource is stopped after on_stop/2 + %% is returned. + _ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), + _ = maybe_clear_alarm(Data#data.id), + ok. proc_name(Id) -> Module = atom_to_binary(?MODULE), Connector = <<"_">>, binary_to_atom(<>). -handle_health_check_request(From, Data) -> +handle_manually_health_check(From, Data) -> with_health_check(Data, fun(Status, UpdatedData) -> Actions = [{reply, From, {ok, Status}}], {next_state, Status, UpdatedData, Actions} end). -connecting_health_check(Data) -> +handle_connecting_health_check(Data) -> with_health_check( Data, fun @@ -369,7 +411,7 @@ connecting_health_check(Data) -> end ). -perform_connected_health_check(Data) -> +handle_connected_health_check(Data) -> with_health_check( Data, fun @@ -386,22 +428,29 @@ with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), - _ = - case Status of - connected -> - ok; - _ -> - emqx_alarm:activate( - ResId, - #{resource_id => ResId, reason => resource_down}, - <<"resource down: ", ResId/binary>> - ) - end, + _ = maybe_alarm(Status, ResId), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, + ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}), Func(Status, UpdatedData). +maybe_alarm(connected, _ResId) -> + ok; +maybe_alarm(_Status, <>) -> + ok; +maybe_alarm(_Status, ResId) -> + emqx_alarm:activate( + ResId, + #{resource_id => ResId, reason => resource_down}, + <<"resource down: ", ResId/binary>> + ). + +maybe_clear_alarm(<>) -> + ok; +maybe_clear_alarm(ResId) -> + emqx_alarm:deactivate(ResId). + parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) -> {Status, OldState, undefined}; parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) -> @@ -409,6 +458,11 @@ parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) -> {Status, NewState, Error}. +maybe_reply(Actions, undefined, _Reply) -> + Actions; +maybe_reply(Actions, From, Reply) -> + [{reply, From, Reply} | Actions]. + data_record_to_external_map_with_metrics(Data) -> #{ id => Data#data.id, @@ -438,10 +492,12 @@ do_wait_for_resource_ready(InstId, Retry) -> do_wait_for_resource_ready(InstId, Retry - 1) end. -safe_call(InstId, Message) -> +safe_call(InstId, Message, Timeout) -> try - gen_statem:call(proc_name(InstId), Message) + gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout}) catch - exit:_ -> - {error, not_found} + exit:{R, _} when R == noproc; R == normal; R == shutdown -> + {error, not_found}; + exit:{timeout, _} -> + {error, timeout} end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 77d0e57c3..2519bc7d3 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include("emqx_resource.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -define(TEST_RESOURCE, emqx_test_resource). -define(ID, <<"id">>). @@ -183,7 +184,6 @@ t_healthy(_) -> emqx_resource:set_resource_status_connecting(?ID), {ok, connected} = emqx_resource:health_check(?ID), - ?assertMatch( [#{status := connected}], emqx_resource:list_instances_verbose() @@ -194,7 +194,7 @@ t_healthy(_) -> ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)), ?assertMatch( - [], + [#{status := connecting}], emqx_resource:list_instances_verbose() ), @@ -236,7 +236,6 @@ t_stop_start(_) -> ), ok = emqx_resource:restart(?ID), - timer:sleep(300), #{pid := Pid1} = emqx_resource:query(?ID, get_state), @@ -334,11 +333,11 @@ t_create_dry_run_local_failed(_) -> ), ?assertEqual(error, Res2), - {Res3, _} = emqx_resource:create_dry_run_local( + Res3 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), - ?assertEqual(error, Res3). + ?assertEqual(ok, Res3). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),