From 90a23d98feef80b72a467df55376f3bef73f51a5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 26 Oct 2023 18:18:20 +0200 Subject: [PATCH] fix: channel handling when resource not connected --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 62 ++++++++++++++++++- .../test/emqx_bridge_v2_test_connector.erl | 12 ++++ .../src/emqx_bridge_kafka_impl_producer.erl | 4 +- apps/emqx_resource/src/emqx_resource.erl | 5 +- .../src/emqx_resource_manager.erl | 62 ++++++++++++++----- 5 files changed, 124 insertions(+), 21 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index df8bfdf1a..cc0a4f18a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -259,7 +259,7 @@ t_send_message(_) -> receive <<"my_msg">> -> ok - after 1000 -> + after 10000 -> ct:fail("Failed to receive message") end, unregister(registered_process_name()), @@ -302,6 +302,66 @@ t_send_message_unhealthy_channel(_) -> {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. +t_send_message_unhealthy_connector(_) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_start_value, conf}), + ets:insert(ResponseETS, {on_get_status_value, connecting}), + OnStartFun = wrap_fun(fun(Conf) -> + case ets:lookup_element(ResponseETS, on_start_value, 2) of + conf -> + {ok, Conf}; + V -> + V + end + end), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + ConConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_start_fun">> => OnStartFun, + <<"on_get_status_fun">> => OnGetStatusFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConName = ?FUNCTION_NAME, + {ok, _} = emqx_connector:create(con_type(), ConName, ConConfig), + BridgeConf = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConName) + }, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf), + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% Test that sending does not work when the connector is unhealthy (connecting) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + register(registered_process_name(), self()), + _ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 100}), + receive + Any -> + ct:pal("Received message: ~p", [Any]), + ct:fail("Should not get message here") + after 10 -> + ok + end, + %% We should have one alarm + 1 = get_bridge_v2_alarm_cnt(), + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% Test that sending works again when the connector is healthy (connected) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + ets:insert(ResponseETS, {on_get_status_value, connected}), + + _ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1000}), + receive + <<"my_msg">> -> + ok + after 1000 -> + ct:fail("Failed to receive message") + end, + %% The alarm should be gone at this point + 0 = get_bridge_v2_alarm_cnt(), + unregister(registered_process_name()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + {ok, _} = emqx_connector:remove(con_type(), ConName), + ets:delete(ResponseETS), + ok. + t_unhealthy_channel_alarm(_) -> Conf = (bridge_config())#{ <<"on_get_channel_status_fun">> => diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index 0e727f720..a84d6b4b2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -37,6 +37,12 @@ query_mode(_Config) -> callback_mode() -> always_sync. +on_start( + _InstId, + #{on_start_fun := FunRef} = Conf +) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), + Fun(Conf); on_start(_InstId, _Config) -> {ok, #{}}. @@ -95,6 +101,12 @@ on_query_async( ) -> throw(not_implemented). +on_get_status( + _InstId, + #{on_get_status_fun := FunRef} +) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), + Fun(); on_get_status( _InstId, _State diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index e43a872f4..9c9f05852 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -502,10 +502,10 @@ on_get_channel_status( connected catch _ErrorType:Reason -> - {connecting, Reason} + {error, Reason} end; {error, Error} -> - {connecting, Error} + {error, Error} end; {error, _Reason} -> connecting diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 603285f66..8c48ee8bd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -200,8 +200,8 @@ | {resource_status(), resource_state(), term()}. -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> - resource_status() - | {resource_status(), term()}. + channel_status() + | {error, term()}. -callback query_mode(Config :: term()) -> query_mode(). @@ -525,7 +525,6 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> channel_status() - | {channel_status(), term()} | {error, term()}. call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d0fc168ba..4b1c9e4a4 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -430,6 +430,14 @@ handle_event(enter, _OldState, connected = State, Data) -> {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); +handle_event( + {call, From}, {add_channel, ChannelId, Config}, connected = _State, Data +) -> + handle_add_channel(From, Data, ChannelId, Config); +handle_event( + {call, From}, {remove_channel, ChannelId}, connected = _State, Data +) -> + handle_remove_channel(From, ChannelId, Data); %% State: DISCONNECTED handle_event(enter, _OldState, disconnected = State, Data) -> ok = log_state_consistency(State, Data), @@ -443,15 +451,15 @@ handle_event(state_timeout, auto_retry, disconnected, Data) -> handle_event(enter, _OldState, stopped = State, Data) -> ok = log_state_consistency(State, Data), {keep_state_and_data, []}; -%% The following events can be handled in any state +%% The following events can be handled in any other state handle_event( - {call, From}, {add_channel, ChannelId, Config}, _State, Data + {call, From}, {add_channel, ChannelId, _Config}, State, Data ) -> - handle_add_channel(From, Data, ChannelId, Config); + handle_not_connected_add_channel(From, ChannelId, State, Data); handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> - handle_remove_channel(From, ChannelId, Data); + handle_not_connected_remove_channel(From, ChannelId, Data); handle_event( {call, From}, get_channels, _State, Data ) -> @@ -545,11 +553,13 @@ start_resource(Data, From) -> reason => Reason }), _ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error), + %% Add channels and raise alarms + NewData1 = channels_health_check(disconnected, add_channels(Data)), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Err}, - Actions = maybe_reply(retry_actions(UpdatedData), From, Err), - {next_state, disconnected, update_state(UpdatedData, Data), Actions} + NewData2 = NewData1#data{status = disconnected, error = Err}, + Actions = maybe_reply(retry_actions(NewData2), From, Err), + {next_state, disconnected, update_state(NewData2, Data), Actions} end. add_channels(Data) -> @@ -612,10 +622,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), + %% Before stop is called we remove all the channels from the resource + NewData = remove_channels(Data), case ResState =/= undefined orelse HasAllocatedResources of true -> - %% Before stop is called we remove all the channels from the resource - NewData = remove_channels(Data), %% we clear the allocated resources after stop is successful emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState); false -> @@ -623,7 +633,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> end, _ = maybe_clear_alarm(ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), - Data#data{status = stopped}. + NewData#data{status = stopped}. remove_channels(Data) -> Channels = maps:keys(Data#data.added_channels), @@ -641,9 +651,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> maybe_clear_alarm(ChannelID), maps:remove(ChannelID, AddedChannelsMap) end, - case - emqx_resource:call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) - of + case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of {ok, NewState} -> NewData = Data#data{ state = NewState, @@ -663,6 +671,11 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> remove_channels_in_list(Rest, NewData, KeepInChannelMap) end. +safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) -> + {ok, State}; +safe_call_remove_channel(ResId, Mod, State, ChannelID) -> + emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID). + make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. @@ -710,8 +723,20 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> update_state(UpdatedData, Data) end. +handle_not_connected_add_channel(From, ChannelId, State, Data) -> + %% When state is not connected the channel will be added to the channels + %% map but nothing else will happen. + Channels = Data#data.added_channels, + NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels), + NewData1 = Data#data{added_channels = NewChannels}, + %% Do channel health check to trigger alarm + NewData2 = channels_health_check(State, NewData1), + {keep_state, update_state(NewData2, Data), [{reply, From, ok}]}. + handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, + %% Deactivate alarm + _ = maybe_clear_alarm(ChannelId), case maps:get(ChannelId, Channels, {error, not_added}) of {error, _} -> %% The channel is already not installed in the connector state. @@ -721,8 +746,6 @@ handle_remove_channel(From, ChannelId, Data) -> NewData = Data#data{ added_channels = NewAddedChannels }, - %% Deactivate alarm - _ = maybe_clear_alarm(ChannelId), {keep_state, NewData, [{reply, From, ok}]}; _ -> %% The channel is installed in the connector state @@ -754,6 +777,15 @@ handle_remove_channel_exists(From, ChannelId, Data) -> {keep_state_and_data, [{reply, From, Error}]} end. +handle_not_connected_remove_channel(From, ChannelId, Data) -> + %% When state is not connected the channel will be removed from the channels + %% map but nothing else will happen. + Channels = Data#data.added_channels, + NewChannels = maps:remove(ChannelId, Channels), + NewData = Data#data{added_channels = NewChannels}, + _ = maybe_clear_alarm(ChannelId), + {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. + handle_manually_health_check(From, Data) -> with_health_check( Data,