diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 30eec029d..be42db57e 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -205,7 +205,7 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> key_path => ConfKeyPath, stacktrace => ST }), - {error, config_update_crashed} + {error, {config_update_crashed, Reason}} end. do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 33943ddbd..cda953a76 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -150,7 +150,7 @@ start_apps(Apps, Handler) when is_function(Handler) -> %% Load all application code to beam vm first %% Because, minirest, ekka etc.. application will scan these modules lists:foreach(fun load/1, [emqx | Apps]), - ekka:start(), + ok = start_ekka(), ok = emqx_ratelimiter_SUITE:base_conf(), lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]). @@ -484,3 +484,12 @@ is_tcp_server_available(Host, Port, Timeout) -> {error, _} -> false end. + +start_ekka() -> + try mnesia_hook:module_info() of + _ -> ekka:start() + catch _:_ -> + %% Falling back to using Mnesia DB backend. + application:set_env(mria, db_backend, mnesia), + ekka:start() + end. diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl index 6861a0be1..72e3ed62f 100644 --- a/apps/emqx/test/emqx_config_handler_SUITE.erl +++ b/apps/emqx/test/emqx_config_handler_SUITE.erl @@ -223,7 +223,7 @@ t_callback_crash(_Config) -> Opts = #{rawconf_with_defaults => true}, ok = emqx_config_handler:add_handler(CrashPath, ?MODULE), Old = emqx:get_raw_config(CrashPath), - ?assertEqual({error, config_update_crashed}, emqx:update_config(CrashPath, <<"89%">>, Opts)), + ?assertMatch({error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)), New = emqx:get_raw_config(CrashPath), ?assertEqual(Old, New), ok = emqx_config_handler:remove_handler(CrashPath), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 60484b018..d59cdf239 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -207,6 +207,7 @@ basic_config(#{ replayq => ReplayQ, %% connection opts server => Server, + connect_timeout => 30, %% 30s reconnect_interval => ReconnIntv, proto_ver => ProtoVer, bridge_mode => true, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index fcad0ef7a..1a9c55ced 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -52,7 +52,8 @@ start(Config) -> Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, undefined), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}), + ServerStr = ip_port_to_server_str(Host, Port), + Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}), Config1 = Config#{ msg_handler => Handlers, host => Host, @@ -70,16 +71,19 @@ start(Config) -> catch throw : Reason -> ok = stop(#{client_pid => Pid}), - {error, Reason} + {error, error_reason(Reason, ServerStr)} end; {error, Reason} -> ok = stop(#{client_pid => Pid}), - {error, Reason} + {error, error_reason(Reason, ServerStr)} end; {error, Reason} -> - {error, Reason} + {error, error_reason(Reason, ServerStr)} end. +error_reason(Reason, ServerStr) -> + #{reason => Reason, server => ServerStr}. + stop(#{client_pid := Pid}) -> safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), ok. @@ -238,7 +242,7 @@ printable_maps(Headers) -> (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). -ip_port_to_server(Host, Port) -> +ip_port_to_server_str(Host, Port) -> HostStr = case inet:ntoa(Host) of {error, einval} -> Host; IPStr -> IPStr diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index d90fae5fd..9291af624 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -366,9 +366,10 @@ t_mqtt_conn_update(_) -> BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), wait_for_resource_ready(BridgeIDEgress, 5), - %% then we try to update 'server' of the connector, to an unavailable IP address - %% the update should fail because of 'unreachable' or 'connrefused' - {ok, 500, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]), + %% Then we try to update 'server' of the connector, to an unavailable IP address + %% The update OK, we recreate the resource even if the resource is current connected, + %% and the target resource we're going to update is unavailable. + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 7bafb57bb..f6bdf55d4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -166,7 +166,8 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - call_instance(<>, {create_dry_run, ResourceType, Config}). + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + call_instance(RandId, {create_dry_run, ResourceType, Config}). -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index af266f763..66f86e010 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -166,16 +166,9 @@ code_change(_OldVsn, State, _Extra) -> do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, Group, #{mod := ResourceType, status := connected} = Data} -> - %% If this resource is in use (status='connected'), we should make sure - %% the new config is OK before removing the old one. - case do_create_dry_run(ResourceType, NewConfig) of - ok -> - do_remove(Group, Data, false), - do_create(InstId, Group, ResourceType, NewConfig, Opts); - Error -> - Error - end; + %% We recreate the resource no matter if it is connected and in use! + %% As we can not know if the resource is "really disconnected" or we mark the status + %% to "disconnected" because the emqx_resource_instance process is not responding. {ok, Group, #{mod := ResourceType, status := _} = Data} -> do_remove(Group, Data, false), do_create(InstId, Group, ResourceType, NewConfig, Opts);