fix: channel handling when resource not connected

This commit is contained in:
Kjell Winblad 2023-10-26 18:18:20 +02:00 committed by Zaiming (Stone) Shi
parent e2aca352b6
commit 90a23d98fe
5 changed files with 124 additions and 21 deletions

View File

@ -259,7 +259,7 @@ t_send_message(_) ->
receive receive
<<"my_msg">> -> <<"my_msg">> ->
ok ok
after 1000 -> after 10000 ->
ct:fail("Failed to receive message") ct:fail("Failed to receive message")
end, end,
unregister(registered_process_name()), unregister(registered_process_name()),
@ -302,6 +302,66 @@ t_send_message_unhealthy_channel(_) ->
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok. ok.
t_send_message_unhealthy_connector(_) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_start_value, conf}),
ets:insert(ResponseETS, {on_get_status_value, connecting}),
OnStartFun = wrap_fun(fun(Conf) ->
case ets:lookup_element(ResponseETS, on_start_value, 2) of
conf ->
{ok, Conf};
V ->
V
end
end),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_start_fun">> => OnStartFun,
<<"on_get_status_fun">> => OnGetStatusFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConName = ?FUNCTION_NAME,
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
BridgeConf = (bridge_config())#{
<<"connector">> => atom_to_binary(ConName)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Test that sending does not work when the connector is unhealthy (connecting)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 100}),
receive
Any ->
ct:pal("Received message: ~p", [Any]),
ct:fail("Should not get message here")
after 10 ->
ok
end,
%% We should have one alarm
1 = get_bridge_v2_alarm_cnt(),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Test that sending works again when the connector is healthy (connected)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets:insert(ResponseETS, {on_get_status_value, connected}),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1000}),
receive
<<"my_msg">> ->
ok
after 1000 ->
ct:fail("Failed to receive message")
end,
%% The alarm should be gone at this point
0 = get_bridge_v2_alarm_cnt(),
unregister(registered_process_name()),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
{ok, _} = emqx_connector:remove(con_type(), ConName),
ets:delete(ResponseETS),
ok.
t_unhealthy_channel_alarm(_) -> t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{ Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => <<"on_get_channel_status_fun">> =>

View File

@ -37,6 +37,12 @@ query_mode(_Config) ->
callback_mode() -> callback_mode() ->
always_sync. always_sync.
on_start(
_InstId,
#{on_start_fun := FunRef} = Conf
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(Conf);
on_start(_InstId, _Config) -> on_start(_InstId, _Config) ->
{ok, #{}}. {ok, #{}}.
@ -95,6 +101,12 @@ on_query_async(
) -> ) ->
throw(not_implemented). throw(not_implemented).
on_get_status(
_InstId,
#{on_get_status_fun := FunRef}
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun();
on_get_status( on_get_status(
_InstId, _InstId,
_State _State

View File

@ -502,10 +502,10 @@ on_get_channel_status(
connected connected
catch catch
_ErrorType:Reason -> _ErrorType:Reason ->
{connecting, Reason} {error, Reason}
end; end;
{error, Error} -> {error, Error} ->
{connecting, Error} {error, Error}
end; end;
{error, _Reason} -> {error, _Reason} ->
connecting connecting

View File

@ -200,8 +200,8 @@
| {resource_status(), resource_state(), term()}. | {resource_status(), resource_state(), term()}.
-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
resource_status() channel_status()
| {resource_status(), term()}. | {error, term()}.
-callback query_mode(Config :: term()) -> query_mode(). -callback query_mode(Config :: term()) -> query_mode().
@ -525,7 +525,6 @@ call_health_check(ResId, Mod, ResourceState) ->
-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
channel_status() channel_status()
| {channel_status(), term()}
| {error, term()}. | {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).

View File

@ -430,6 +430,14 @@ handle_event(enter, _OldState, connected = State, Data) ->
{keep_state_and_data, health_check_actions(Data)}; {keep_state_and_data, health_check_actions(Data)};
handle_event(state_timeout, health_check, connected, Data) -> handle_event(state_timeout, health_check, connected, Data) ->
handle_connected_health_check(Data); handle_connected_health_check(Data);
handle_event(
{call, From}, {add_channel, ChannelId, Config}, connected = _State, Data
) ->
handle_add_channel(From, Data, ChannelId, Config);
handle_event(
{call, From}, {remove_channel, ChannelId}, connected = _State, Data
) ->
handle_remove_channel(From, ChannelId, Data);
%% State: DISCONNECTED %% State: DISCONNECTED
handle_event(enter, _OldState, disconnected = State, Data) -> handle_event(enter, _OldState, disconnected = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_state_consistency(State, Data),
@ -443,15 +451,15 @@ handle_event(state_timeout, auto_retry, disconnected, Data) ->
handle_event(enter, _OldState, stopped = State, Data) -> handle_event(enter, _OldState, stopped = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_state_consistency(State, Data),
{keep_state_and_data, []}; {keep_state_and_data, []};
%% The following events can be handled in any state %% The following events can be handled in any other state
handle_event( handle_event(
{call, From}, {add_channel, ChannelId, Config}, _State, Data {call, From}, {add_channel, ChannelId, _Config}, State, Data
) -> ) ->
handle_add_channel(From, Data, ChannelId, Config); handle_not_connected_add_channel(From, ChannelId, State, Data);
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data {call, From}, {remove_channel, ChannelId}, _State, Data
) -> ) ->
handle_remove_channel(From, ChannelId, Data); handle_not_connected_remove_channel(From, ChannelId, Data);
handle_event( handle_event(
{call, From}, get_channels, _State, Data {call, From}, get_channels, _State, Data
) -> ) ->
@ -545,11 +553,13 @@ start_resource(Data, From) ->
reason => Reason reason => Reason
}), }),
_ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error), _ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error),
%% Add channels and raise alarms
NewData1 = channels_health_check(disconnected, add_channels(Data)),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Err}, NewData2 = NewData1#data{status = disconnected, error = Err},
Actions = maybe_reply(retry_actions(UpdatedData), From, Err), Actions = maybe_reply(retry_actions(NewData2), From, Err),
{next_state, disconnected, update_state(UpdatedData, Data), Actions} {next_state, disconnected, update_state(NewData2, Data), Actions}
end. end.
add_channels(Data) -> add_channels(Data) ->
@ -612,10 +622,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% The callback mod should make sure the resource is stopped after on_stop/2 %% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned. %% is returned.
HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),
%% Before stop is called we remove all the channels from the resource
NewData = remove_channels(Data),
case ResState =/= undefined orelse HasAllocatedResources of case ResState =/= undefined orelse HasAllocatedResources of
true -> true ->
%% Before stop is called we remove all the channels from the resource
NewData = remove_channels(Data),
%% we clear the allocated resources after stop is successful %% we clear the allocated resources after stop is successful
emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState); emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState);
false -> false ->
@ -623,7 +633,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
end, end,
_ = maybe_clear_alarm(ResId), _ = maybe_clear_alarm(ResId),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
Data#data{status = stopped}. NewData#data{status = stopped}.
remove_channels(Data) -> remove_channels(Data) ->
Channels = maps:keys(Data#data.added_channels), Channels = maps:keys(Data#data.added_channels),
@ -641,9 +651,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
maybe_clear_alarm(ChannelID), maybe_clear_alarm(ChannelID),
maps:remove(ChannelID, AddedChannelsMap) maps:remove(ChannelID, AddedChannelsMap)
end, end,
case case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of
emqx_resource:call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID)
of
{ok, NewState} -> {ok, NewState} ->
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
@ -663,6 +671,11 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
remove_channels_in_list(Rest, NewData, KeepInChannelMap) remove_channels_in_list(Rest, NewData, KeepInChannelMap)
end. end.
safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
{ok, State};
safe_call_remove_channel(ResId, Mod, State, ChannelID) ->
emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID).
make_test_id() -> make_test_id() ->
RandId = iolist_to_binary(emqx_utils:gen_id(16)), RandId = iolist_to_binary(emqx_utils:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>. <<?TEST_ID_PREFIX, RandId/binary>>.
@ -710,8 +723,20 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
update_state(UpdatedData, Data) update_state(UpdatedData, Data)
end. end.
handle_not_connected_add_channel(From, ChannelId, State, Data) ->
%% When state is not connected the channel will be added to the channels
%% map but nothing else will happen.
Channels = Data#data.added_channels,
NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels),
NewData1 = Data#data{added_channels = NewChannels},
%% Do channel health check to trigger alarm
NewData2 = channels_health_check(State, NewData1),
{keep_state, update_state(NewData2, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
case maps:get(ChannelId, Channels, {error, not_added}) of case maps:get(ChannelId, Channels, {error, not_added}) of
{error, _} -> {error, _} ->
%% The channel is already not installed in the connector state. %% The channel is already not installed in the connector state.
@ -721,8 +746,6 @@ handle_remove_channel(From, ChannelId, Data) ->
NewData = Data#data{ NewData = Data#data{
added_channels = NewAddedChannels added_channels = NewAddedChannels
}, },
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
{keep_state, NewData, [{reply, From, ok}]}; {keep_state, NewData, [{reply, From, ok}]};
_ -> _ ->
%% The channel is installed in the connector state %% The channel is installed in the connector state
@ -754,6 +777,15 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
{keep_state_and_data, [{reply, From, Error}]} {keep_state_and_data, [{reply, From, Error}]}
end. end.
handle_not_connected_remove_channel(From, ChannelId, Data) ->
%% When state is not connected the channel will be removed from the channels
%% map but nothing else will happen.
Channels = Data#data.added_channels,
NewChannels = maps:remove(ChannelId, Channels),
NewData = Data#data{added_channels = NewChannels},
_ = maybe_clear_alarm(ChannelId),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_manually_health_check(From, Data) -> handle_manually_health_check(From, Data) ->
with_health_check( with_health_check(
Data, Data,