diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fbbab59ac..ad3fee858 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -203,7 +203,7 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. -update(Type, Name, {_OldConf, Conf}) -> +update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -212,15 +212,22 @@ update(Type, Name, {_OldConf, Conf}) -> %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% without restarting the bridge. %% - ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, - config => Conf}), - case recreate(Type, Name, Conf) of - {ok, _} -> maybe_disable_bridge(Type, Name, Conf); - {error, not_found} -> - ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" - , type => Type, name => Name, config => Conf}), - create(Type, Name, Conf); - {error, _} = Err -> Err + case if_only_to_toggole_enable(OldConf, Conf) of + false -> + ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, + config => Conf}), + case recreate(Type, Name, Conf) of + {ok, _} -> maybe_disable_bridge(Type, Name, Conf); + {error, not_found} -> + ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" + , type => Type, name => Name, config => Conf}), + create(Type, Name, Conf); + {error, Reason} -> {update_bridge_failed, Reason} + end; + true -> + %% we don't need to recreate the bridge if this config change is only to + %% toggole the config 'bridge.{type}.{name}.enable' + ok end. recreate(Type, Name) -> @@ -344,6 +351,17 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. +if_only_to_toggole_enable(OldConf, Conf) -> + #{added := Added, removed := Removed, changed := Updated} = + emqx_map_lib:diff_maps(OldConf, Conf), + case {Added, Removed, Updated} of + {Added, Removed, #{enable := _}= Updated} + when map_size(Added) =:= 0, + map_size(Removed) =:= 0, + map_size(Updated) =:= 1 -> true; + {_, _, _} -> false + end. + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6f2d5c7ad..121d99f85 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -334,7 +334,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)}; UpReq -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - UpReq, #{override_to => cluster}) of + {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of {ok, _} -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 519368523..2c84e5630 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -40,10 +40,15 @@ stop(_State) -> ok. -define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart). -pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) -> +pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) -> {error, bridge_not_found}; -pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) -> - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; +pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) -> + case perform_operation(Oper, Type, Name) of + ok -> + %% we also need to save the 'enable' to the config files + {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; + {error, _} = Err -> Err + end; pre_config_update(_, Conf, _OldConfig) -> {ok, Conf}. @@ -51,3 +56,7 @@ pre_config_update(_, Conf, _OldConfig) -> operation_to_enable(start) -> true; operation_to_enable(stop) -> false; operation_to_enable(restart) -> true. + +perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name); +perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name); +perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 0249d51b1..2cdc9595c 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -169,14 +169,20 @@ on_start(InstId, #{base_url := #{scheme := Scheme, pool_name => PoolName, host => Host, port => Port, + connect_timeout => ConnectTimeout, base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(PoolName, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; + case do_health_check(Host, Port, ConnectTimeout) of + ok -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> {ok, State}; + {error, {already_started, _}} -> {ok, State}; + {error, Reason} -> + {error, Reason} + end; {error, Reason} -> - {error, Reason} + {error, {http_start_failed, Reason}} end. on_stop(InstId, #{pool_name := PoolName}) -> @@ -216,13 +222,17 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, end, Result. -on_health_check(_InstId, #{host := Host, port := Port} = State) -> - case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of - {ok, Sock} -> - gen_tcp:close(Sock), - {ok, State}; - {error, _Reason} -> - {error, test_query_failed, State} +on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> + case do_health_check(Host, Port, Timeout) of + ok -> {ok, State}; + {error, Reason} -> + {error, {http_health_check_failed, Reason}, State} + end. + +do_health_check(Host, Port, Timeout) -> + case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of + {ok, Sock} -> gen_tcp:close(Sock), ok; + {error, Reason} -> {error, Reason} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b062e83ae..fd4046505 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -188,7 +188,7 @@ query(InstId, Request) -> query(InstId, Request, AfterQuery) -> case get_instance(InstId) of {ok, #{status := stopped}} -> - error({InstId, stopped}); + error({resource_stopped, InstId}); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index eaf6db0b2..8c5232706 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -173,18 +173,19 @@ do_create(InstId, ResourceType, Config) -> case lookup(InstId) of {ok, _} -> {ok, already_created}; _ -> + Res0 = #{id => InstId, mod => ResourceType, config => Config, + status => stopped, state => undefined}, case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - ets:insert(emqx_resource_instance, {InstId, - #{mod => ResourceType, config => Config, - state => ResourceState, status => stopped}}), - _ = do_health_check(InstId), ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), + %% this is the first time we do health check, this will update the + %% status and then do ets:insert/2 + _ = do_health_check(Res0#{state => ResourceState}), {ok, force_lookup(InstId)}; {error, Reason} -> logger:error("start ~ts resource ~ts failed: ~p", [ResourceType, InstId, Reason]), - {error, Reason} + {ok, Res0} end end. @@ -243,22 +244,24 @@ do_stop(InstId) -> Error end. -do_health_check(InstId) -> +do_health_check(InstId) when is_binary(InstId) -> case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState0} = Data} -> - case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of - {ok, ResourceState1} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{status => started, state => ResourceState1}}), - ok; - {error, Reason, ResourceState1} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert(emqx_resource_instance, - {InstId, Data#{status => stopped, state => ResourceState1}}), - {error, Reason} - end; - Error -> - Error + {ok, Data} -> do_health_check(Data); + Error -> Error + end; +do_health_check(#{state := undefined}) -> + {error, resource_not_initialized}; +do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> + case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of + {ok, ResourceState1} -> + ets:insert(emqx_resource_instance, + {InstId, Data#{status => started, state => ResourceState1}}), + ok; + {error, Reason, ResourceState1} -> + logger:error("health check for ~p failed: ~p", [InstId, Reason]), + ets:insert(emqx_resource_instance, + {InstId, Data#{status => stopped, state => ResourceState1}}), + {error, Reason} end. %%------------------------------------------------------------------------------