feat(resource): non-blocking channel health checks
Fixes https://emqx.atlassian.net/browse/EMQX-12015 Continuation of https://github.com/emqx/emqx/pull/12812
This commit is contained in:
parent
069cd4fbb4
commit
60cad74286
|
@ -858,6 +858,12 @@ do_start_stop_bridges(Type, Config) ->
|
||||||
<<"status_reason">> := <<"connack_timeout">>
|
<<"status_reason">> := <<"connack_timeout">>
|
||||||
} ->
|
} ->
|
||||||
ok;
|
ok;
|
||||||
|
#{
|
||||||
|
<<"node_status">> := [_, _ | _],
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"status_reason">> := <<"connack_timeout">>
|
||||||
|
} ->
|
||||||
|
ok;
|
||||||
#{
|
#{
|
||||||
<<"node_status">> := [_],
|
<<"node_status">> := [_],
|
||||||
<<"status">> := <<"connecting">>
|
<<"status">> := <<"connecting">>
|
||||||
|
|
|
@ -61,7 +61,7 @@
|
||||||
-export([init/1, callback_mode/0, handle_event/4, terminate/3]).
|
-export([init/1, callback_mode/0, handle_event/4, terminate/3]).
|
||||||
|
|
||||||
%% Internal exports.
|
%% Internal exports.
|
||||||
-export([worker_resource_health_check/1]).
|
-export([worker_resource_health_check/1, worker_channel_health_check/2]).
|
||||||
|
|
||||||
% State record
|
% State record
|
||||||
-record(data, {
|
-record(data, {
|
||||||
|
@ -78,12 +78,24 @@
|
||||||
pid,
|
pid,
|
||||||
added_channels = #{},
|
added_channels = #{},
|
||||||
%% Reference to process performing resource health check.
|
%% Reference to process performing resource health check.
|
||||||
hc_workers = #{resource => #{}, channel => #{}} :: #{
|
hc_workers = #{
|
||||||
resource | channel := #{{pid(), reference()} => true}
|
resource => #{},
|
||||||
|
channel => #{
|
||||||
|
pending => [],
|
||||||
|
previous_status => #{}
|
||||||
|
}
|
||||||
|
} :: #{
|
||||||
|
resource := #{{pid(), reference()} => true},
|
||||||
|
channel := #{
|
||||||
|
{pid(), reference()} => channel_id(),
|
||||||
|
pending := [channel_id()],
|
||||||
|
previous_status := #{channel_id() => channel_status_map()}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
%% Callers waiting on health check
|
%% Callers waiting on health check
|
||||||
hc_pending_callers = #{resource => [], channel => []} :: #{
|
hc_pending_callers = #{resource => [], channel => #{}} :: #{
|
||||||
resource | channel := [gen_server:from()]
|
resource := [gen_server:from()],
|
||||||
|
channel := #{channel_id() => [gen_server:from()]}
|
||||||
},
|
},
|
||||||
extra
|
extra
|
||||||
}).
|
}).
|
||||||
|
@ -107,6 +119,12 @@
|
||||||
-define(state_disconnected, disconnected).
|
-define(state_disconnected, disconnected).
|
||||||
-define(state_stopped, stopped).
|
-define(state_stopped, stopped).
|
||||||
|
|
||||||
|
-type state() ::
|
||||||
|
?state_stopped
|
||||||
|
| ?state_disconnected
|
||||||
|
| ?state_connecting
|
||||||
|
| ?state_connected.
|
||||||
|
|
||||||
-define(IS_STATUS(ST),
|
-define(IS_STATUS(ST),
|
||||||
ST =:= ?status_connecting; ST =:= ?status_connected; ST =:= ?status_disconnected
|
ST =:= ?status_connecting; ST =:= ?status_connected; ST =:= ?status_disconnected
|
||||||
).
|
).
|
||||||
|
@ -339,6 +357,7 @@ add_channel(ResId, ChannelId, Config) ->
|
||||||
Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
|
Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
|
||||||
%% Wait for health_check to finish
|
%% Wait for health_check to finish
|
||||||
_ = health_check(ResId),
|
_ = health_check(ResId),
|
||||||
|
_ = channel_health_check(ResId, ChannelId),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
remove_channel(ResId, ChannelId) ->
|
remove_channel(ResId, ChannelId) ->
|
||||||
|
@ -538,11 +557,20 @@ handle_event(
|
||||||
info,
|
info,
|
||||||
{'DOWN', Ref, process, Pid, Res},
|
{'DOWN', Ref, process, Pid, Res},
|
||||||
State0,
|
State0,
|
||||||
Data0 = #data{hc_workers = #{resource := HCWorkers}}
|
Data0 = #data{hc_workers = #{resource := RHCWorkers}}
|
||||||
) when
|
) when
|
||||||
is_map_key({Pid, Ref}, HCWorkers)
|
is_map_key({Pid, Ref}, RHCWorkers)
|
||||||
->
|
->
|
||||||
handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
|
handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
|
||||||
|
handle_event(
|
||||||
|
info,
|
||||||
|
{'DOWN', Ref, process, Pid, Res},
|
||||||
|
_State,
|
||||||
|
Data0 = #data{hc_workers = #{channel := CHCWorkers}}
|
||||||
|
) when
|
||||||
|
is_map_key({Pid, Ref}, CHCWorkers)
|
||||||
|
->
|
||||||
|
handle_channel_health_check_worker_down(Data0, {Pid, Ref}, Res);
|
||||||
% Ignore all other events
|
% Ignore all other events
|
||||||
handle_event(EventType, EventData, State, Data) ->
|
handle_event(EventType, EventData, State, Data) ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
|
@ -558,7 +586,7 @@ handle_event(EventType, EventData, State, Data) ->
|
||||||
keep_state_and_data.
|
keep_state_and_data.
|
||||||
|
|
||||||
log_status_consistency(Status, #data{status = Status} = Data) ->
|
log_status_consistency(Status, #data{status = Status} = Data) ->
|
||||||
log_cache_consistency(read_cache(Data#data.id), Data);
|
log_cache_consistency(read_cache(Data#data.id), remove_runtime_data(Data));
|
||||||
log_status_consistency(Status, Data) ->
|
log_status_consistency(Status, Data) ->
|
||||||
?tp(warning, "inconsistent_status", #{
|
?tp(warning, "inconsistent_status", #{
|
||||||
status => Status,
|
status => Status,
|
||||||
|
@ -869,7 +897,7 @@ handle_manual_resource_health_check(From, Data0) ->
|
||||||
Data = Data0#data{hc_pending_callers = Pending},
|
Data = Data0#data{hc_pending_callers = Pending},
|
||||||
start_resource_health_check(Data).
|
start_resource_health_check(Data).
|
||||||
|
|
||||||
reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) ->
|
reply_pending_resource_health_check_callers(Status, Data0 = #data{hc_pending_callers = Pending0}) ->
|
||||||
#{resource := RPending} = Pending0,
|
#{resource := RPending} = Pending0,
|
||||||
Actions = [{reply, From, {ok, Status}} || From <- RPending],
|
Actions = [{reply, From, {ok, Status}} || From <- RPending],
|
||||||
Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
|
Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
|
||||||
|
@ -888,13 +916,13 @@ start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
start_resource_health_check(#data{} = Data0) ->
|
start_resource_health_check(#data{} = Data0) ->
|
||||||
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
|
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
|
||||||
WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0),
|
WorkerRef = {_Pid, _Ref} = spawn_resource_health_check_worker(Data0),
|
||||||
HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
|
HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
|
||||||
Data = Data0#data{hc_workers = HCWorkers},
|
Data = Data0#data{hc_workers = HCWorkers},
|
||||||
{keep_state, Data}.
|
{keep_state, Data}.
|
||||||
|
|
||||||
-spec spawn_health_check_worker(data()) -> {pid(), reference()}.
|
-spec spawn_resource_health_check_worker(data()) -> {pid(), reference()}.
|
||||||
spawn_health_check_worker(#data{} = Data) ->
|
spawn_resource_health_check_worker(#data{} = Data) ->
|
||||||
spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
|
spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
|
||||||
|
|
||||||
%% separated so it can be spec'ed and placate dialyzer tantrums...
|
%% separated so it can be spec'ed and placate dialyzer tantrums...
|
||||||
|
@ -939,7 +967,7 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
|
||||||
continue_resource_health_check_connected(NewStatus, Data0) ->
|
continue_resource_health_check_connected(NewStatus, Data0) ->
|
||||||
case NewStatus of
|
case NewStatus of
|
||||||
?status_connected ->
|
?status_connected ->
|
||||||
{Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0),
|
{Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
|
||||||
Data2 = channels_health_check(?status_connected, Data1),
|
Data2 = channels_health_check(?status_connected, Data1),
|
||||||
Data = update_state(Data2, Data0),
|
Data = update_state(Data2, Data0),
|
||||||
Actions = Replies ++ health_check_actions(Data),
|
Actions = Replies ++ health_check_actions(Data),
|
||||||
|
@ -954,13 +982,13 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
|
||||||
%% subset of resource manager state... But there should be a conversion
|
%% subset of resource manager state... But there should be a conversion
|
||||||
%% between the two here, as resource manager also has `stopped', which is
|
%% between the two here, as resource manager also has `stopped', which is
|
||||||
%% not a valid status at the time of writing.
|
%% not a valid status at the time of writing.
|
||||||
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
|
{Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
|
||||||
{next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
|
{next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Continuation to be used when the current resource state is not `?state_connected'.
|
%% Continuation to be used when the current resource state is not `?state_connected'.
|
||||||
continue_resource_health_check_not_connected(NewStatus, Data0) ->
|
continue_resource_health_check_not_connected(NewStatus, Data0) ->
|
||||||
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
|
{Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
|
||||||
case NewStatus of
|
case NewStatus of
|
||||||
?status_connected ->
|
?status_connected ->
|
||||||
{next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
|
{next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
|
||||||
|
@ -975,6 +1003,30 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
|
||||||
|
|
||||||
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
|
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
|
||||||
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
|
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
|
||||||
|
handle_manual_channel_health_check(
|
||||||
|
From,
|
||||||
|
#data{
|
||||||
|
added_channels = Channels,
|
||||||
|
hc_pending_callers = #{channel := CPending0} = Pending0,
|
||||||
|
hc_workers = #{channel := #{previous_status := PreviousStatus}}
|
||||||
|
} = Data0,
|
||||||
|
ChannelId
|
||||||
|
) when
|
||||||
|
is_map_key(ChannelId, Channels),
|
||||||
|
is_map_key(ChannelId, PreviousStatus)
|
||||||
|
->
|
||||||
|
%% Ongoing health check.
|
||||||
|
CPending = maps:update_with(
|
||||||
|
ChannelId,
|
||||||
|
fun(OtherCallers) ->
|
||||||
|
[From | OtherCallers]
|
||||||
|
end,
|
||||||
|
[From],
|
||||||
|
CPending0
|
||||||
|
),
|
||||||
|
Pending = Pending0#{channel := CPending},
|
||||||
|
Data = Data0#data{hc_pending_callers = Pending},
|
||||||
|
{keep_state, Data};
|
||||||
handle_manual_channel_health_check(
|
handle_manual_channel_health_check(
|
||||||
From,
|
From,
|
||||||
#data{added_channels = Channels} = _Data,
|
#data{added_channels = Channels} = _Data,
|
||||||
|
@ -982,6 +1034,7 @@ handle_manual_channel_health_check(
|
||||||
) when
|
) when
|
||||||
is_map_key(ChannelId, Channels)
|
is_map_key(ChannelId, Channels)
|
||||||
->
|
->
|
||||||
|
%% No ongoing health check: reply with current status.
|
||||||
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
|
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
|
||||||
handle_manual_channel_health_check(
|
handle_manual_channel_health_check(
|
||||||
From,
|
From,
|
||||||
|
@ -990,10 +1043,6 @@ handle_manual_channel_health_check(
|
||||||
) ->
|
) ->
|
||||||
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
|
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
|
||||||
|
|
||||||
get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
|
|
||||||
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
|
|
||||||
channel_status(RawStatus).
|
|
||||||
|
|
||||||
-spec channels_health_check(resource_status(), data()) -> data().
|
-spec channels_health_check(resource_status(), data()) -> data().
|
||||||
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
|
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
|
||||||
Channels = maps:to_list(Data0#data.added_channels),
|
Channels = maps:to_list(Data0#data.added_channels),
|
||||||
|
@ -1009,8 +1058,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
|
||||||
get_config_for_channels(Data0, ChannelsNotAdded),
|
get_config_for_channels(Data0, ChannelsNotAdded),
|
||||||
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
|
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
|
||||||
%% Now that we have done the adding, we can get the status of all channels
|
%% Now that we have done the adding, we can get the status of all channels
|
||||||
Data2 = channel_status_for_all_channels(Data1),
|
trigger_health_check_for_added_channels(Data1);
|
||||||
update_state(Data2, Data0);
|
|
||||||
channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|
channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|
||||||
%% Whenever the resource is connecting:
|
%% Whenever the resource is connecting:
|
||||||
%% 1. Change the status of all added channels to connecting
|
%% 1. Change the status of all added channels to connecting
|
||||||
|
@ -1105,41 +1153,117 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
channel_status_for_all_channels(Data) ->
|
%% Currently, we only call resource channel health checks when the underlying resource is
|
||||||
Channels = maps:to_list(Data#data.added_channels),
|
%% `?status_connected'.
|
||||||
AddedChannelsWithOldAndNewStatus = [
|
-spec trigger_health_check_for_added_channels(data()) -> data().
|
||||||
{ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)}
|
trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
|
||||||
|| {ChannelId, OldStatus} <- Channels,
|
#{channel := CHCWorkers0} = HCWorkers0,
|
||||||
|
PreviousStatus = maps:from_list([
|
||||||
|
{ChannelId, OldStatus}
|
||||||
|
|| {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels),
|
||||||
channel_status_is_channel_added(OldStatus)
|
channel_status_is_channel_added(OldStatus)
|
||||||
],
|
]),
|
||||||
|
ChannelsToCheck = maps:keys(PreviousStatus),
|
||||||
|
case ChannelsToCheck of
|
||||||
|
[] ->
|
||||||
|
%% Nothing to do.
|
||||||
|
Data0;
|
||||||
|
[ChannelId | Rest] ->
|
||||||
|
%% Shooting one check at a time. We could increase concurrency in the future.
|
||||||
|
CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus},
|
||||||
|
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
||||||
|
start_channel_health_check(Data1, ChannelId)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec continue_channel_health_check_connected(data()) -> data().
|
||||||
|
continue_channel_health_check_connected(Data0) ->
|
||||||
|
#data{hc_workers = HCWorkers0} = Data0,
|
||||||
|
#{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0,
|
||||||
|
CHCWorkers = CHCWorkers0#{previous_status := #{}},
|
||||||
|
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
||||||
%% Remove the added channels with a a status different from connected or connecting
|
%% Remove the added channels with a a status different from connected or connecting
|
||||||
|
CheckedChannels = [
|
||||||
|
{ChannelId, NewStatus}
|
||||||
|
|| {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels),
|
||||||
|
is_map_key(ChannelId, PreviousStatus)
|
||||||
|
],
|
||||||
ChannelsToRemove = [
|
ChannelsToRemove = [
|
||||||
ChannelId
|
ChannelId
|
||||||
|| {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus,
|
|| {ChannelId, NewStatus} <- CheckedChannels,
|
||||||
not channel_status_is_channel_added(NewStatus)
|
not channel_status_is_channel_added(NewStatus)
|
||||||
],
|
],
|
||||||
Data1 = remove_channels_in_list(ChannelsToRemove, Data, true),
|
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
|
||||||
%% Raise/clear alarms
|
%% Raise/clear alarms
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun
|
fun
|
||||||
({ID, _OldStatus, #{status := ?status_connected}}) ->
|
({ID, #{status := ?status_connected}}) ->
|
||||||
_ = maybe_clear_alarm(ID);
|
_ = maybe_clear_alarm(ID);
|
||||||
({ID, OldStatus, NewStatus}) ->
|
({ID, NewStatus}) ->
|
||||||
|
OldStatus = maps:get(ID, PreviousStatus),
|
||||||
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
|
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
|
||||||
end,
|
end,
|
||||||
AddedChannelsWithOldAndNewStatus
|
CheckedChannels
|
||||||
),
|
),
|
||||||
%% Update the ChannelsMap
|
Data.
|
||||||
ChannelsMap = Data1#data.added_channels,
|
|
||||||
NewChannelsMap =
|
-spec start_channel_health_check(data(), channel_id()) -> data().
|
||||||
lists:foldl(
|
start_channel_health_check(#data{} = Data0, ChannelId) ->
|
||||||
fun({ChannelId, _, NewStatus}, Acc) ->
|
#data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0,
|
||||||
maps:put(ChannelId, NewStatus, Acc)
|
WorkerRef = {_Pid, _Ref} = spawn_channel_health_check_worker(Data0, ChannelId),
|
||||||
|
HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerRef => ChannelId}},
|
||||||
|
Data0#data{hc_workers = HCWorkers}.
|
||||||
|
|
||||||
|
-spec spawn_channel_health_check_worker(data(), channel_id()) -> {pid(), reference()}.
|
||||||
|
spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
|
||||||
|
spawn_monitor(?MODULE, worker_channel_health_check, [Data, ChannelId]).
|
||||||
|
|
||||||
|
%% separated so it can be spec'ed and placate dialyzer tantrums...
|
||||||
|
-spec worker_channel_health_check(data(), channel_id()) -> no_return().
|
||||||
|
worker_channel_health_check(Data, ChannelId) ->
|
||||||
|
#data{id = ResId, mod = Mod, state = State} = Data,
|
||||||
|
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
|
||||||
|
exit({ok, channel_status(RawStatus)}).
|
||||||
|
|
||||||
|
-spec handle_channel_health_check_worker_down(
|
||||||
|
data(), {pid(), reference()}, {ok, channel_status_map()}
|
||||||
|
) ->
|
||||||
|
gen_statem:event_handler_result(state(), data()).
|
||||||
|
handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
|
||||||
|
#data{
|
||||||
|
hc_workers = HCWorkers0 = #{channel := CHCWorkers0},
|
||||||
|
added_channels = AddedChannels0
|
||||||
|
} = Data0,
|
||||||
|
{ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
|
||||||
|
case ExitResult of
|
||||||
|
{ok, NewStatus} ->
|
||||||
|
%% `emqx_resource:call_channel_health_check' catches all exceptions.
|
||||||
|
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
|
||||||
end,
|
end,
|
||||||
ChannelsMap,
|
Data1 = Data0#data{added_channels = AddedChannels},
|
||||||
AddedChannelsWithOldAndNewStatus
|
{Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
|
||||||
),
|
case CHCWorkers1 of
|
||||||
Data1#data{added_channels = NewChannelsMap}.
|
#{pending := [NextChannelId | Rest]} ->
|
||||||
|
CHCWorkers = CHCWorkers1#{pending := Rest},
|
||||||
|
HCWorkers = HCWorkers0#{channel := CHCWorkers},
|
||||||
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
|
Data = start_channel_health_check(Data3, NextChannelId),
|
||||||
|
{keep_state, update_state(Data, Data0), Replies};
|
||||||
|
#{pending := []} ->
|
||||||
|
HCWorkers = HCWorkers0#{channel := CHCWorkers1},
|
||||||
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
|
Data = continue_channel_health_check_connected(Data3),
|
||||||
|
{keep_state, update_state(Data, Data0), Replies}
|
||||||
|
end.
|
||||||
|
|
||||||
|
reply_pending_channel_health_check_callers(
|
||||||
|
ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0}
|
||||||
|
) ->
|
||||||
|
#{channel := CPending0} = Pending0,
|
||||||
|
Pending = maps:get(ChannelId, CPending0, []),
|
||||||
|
Actions = [{reply, From, Status} || From <- Pending],
|
||||||
|
CPending = maps:remove(ChannelId, CPending0),
|
||||||
|
Data = Data0#data{hc_pending_callers = Pending0#{channel := CPending}},
|
||||||
|
{Actions, Data}.
|
||||||
|
|
||||||
get_config_for_channels(Data0, ChannelsWithoutConfig) ->
|
get_config_for_channels(Data0, ChannelsWithoutConfig) ->
|
||||||
ResId = Data0#data.id,
|
ResId = Data0#data.id,
|
||||||
|
@ -1181,8 +1305,14 @@ update_state(Data, _DataWas) ->
|
||||||
|
|
||||||
remove_runtime_data(#data{} = Data0) ->
|
remove_runtime_data(#data{} = Data0) ->
|
||||||
Data0#data{
|
Data0#data{
|
||||||
hc_workers = #{resource => #{}, channel => #{}},
|
hc_workers = #{
|
||||||
hc_pending_callers = #{resource => [], channel => []}
|
resource => #{},
|
||||||
|
channel => #{pending => [], previous_status => #{}}
|
||||||
|
},
|
||||||
|
hc_pending_callers = #{
|
||||||
|
resource => [],
|
||||||
|
channel => #{}
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
health_check_interval(Opts) ->
|
health_check_interval(Opts) ->
|
||||||
|
|
|
@ -31,7 +31,12 @@
|
||||||
on_query_async/4,
|
on_query_async/4,
|
||||||
on_batch_query/3,
|
on_batch_query/3,
|
||||||
on_batch_query_async/4,
|
on_batch_query_async/4,
|
||||||
on_get_status/2
|
on_get_status/2,
|
||||||
|
|
||||||
|
on_add_channel/4,
|
||||||
|
on_remove_channel/3,
|
||||||
|
on_get_channels/1,
|
||||||
|
on_get_channel_status/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([counter_loop/0, set_callback_mode/1]).
|
-export([counter_loop/0, set_callback_mode/1]).
|
||||||
|
@ -40,6 +45,7 @@
|
||||||
-export([roots/0]).
|
-export([roots/0]).
|
||||||
|
|
||||||
-define(CM_KEY, {?MODULE, callback_mode}).
|
-define(CM_KEY, {?MODULE, callback_mode}).
|
||||||
|
-define(PT_CHAN_KEY(CONN_RES_ID), {?MODULE, chans, CONN_RES_ID}).
|
||||||
|
|
||||||
roots() ->
|
roots() ->
|
||||||
[
|
[
|
||||||
|
@ -71,12 +77,14 @@ on_start(InstId, #{name := Name} = Opts) ->
|
||||||
{ok, Opts#{
|
{ok, Opts#{
|
||||||
id => InstId,
|
id => InstId,
|
||||||
stop_error => StopError,
|
stop_error => StopError,
|
||||||
|
channels => #{},
|
||||||
pid => spawn_counter_process(Name, Register)
|
pid => spawn_counter_process(Name, Register)
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
on_stop(_InstId, #{stop_error := true}) ->
|
on_stop(_InstId, #{stop_error := true}) ->
|
||||||
{error, stop_error};
|
{error, stop_error};
|
||||||
on_stop(_InstId, #{pid := Pid}) ->
|
on_stop(InstId, #{pid := Pid}) ->
|
||||||
|
persistent_term:erase(?PT_CHAN_KEY(InstId)),
|
||||||
stop_counter_process(Pid).
|
stop_counter_process(Pid).
|
||||||
|
|
||||||
on_query(_InstId, get_state, State) ->
|
on_query(_InstId, get_state, State) ->
|
||||||
|
@ -295,6 +303,31 @@ on_get_status(_InstId, #{pid := Pid}) ->
|
||||||
false -> ?status_disconnected
|
false -> ?status_disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
on_add_channel(ConnResId, ConnSt0, ChanId, ChanCfg) ->
|
||||||
|
ConnSt = emqx_utils_maps:deep_put([channels, ChanId], ConnSt0, ChanCfg),
|
||||||
|
do_add_channel(ConnResId, ChanId, ChanCfg),
|
||||||
|
{ok, ConnSt}.
|
||||||
|
|
||||||
|
on_remove_channel(ConnResId, ConnSt0, ChanId) ->
|
||||||
|
ConnSt = emqx_utils_maps:deep_remove([channels, ChanId], ConnSt0),
|
||||||
|
do_remove_channel(ConnResId, ChanId),
|
||||||
|
{ok, ConnSt}.
|
||||||
|
|
||||||
|
on_get_channels(ConnResId) ->
|
||||||
|
persistent_term:get(?PT_CHAN_KEY(ConnResId), []).
|
||||||
|
|
||||||
|
on_get_channel_status(_ConnResId, ChanId, #{channels := Chans}) ->
|
||||||
|
case Chans of
|
||||||
|
#{ChanId := #{health_check_delay := Delay}} ->
|
||||||
|
?tp(connector_demo_channel_health_check_delay, #{}),
|
||||||
|
timer:sleep(Delay),
|
||||||
|
?status_connected;
|
||||||
|
#{ChanId := _ChanCfg} ->
|
||||||
|
?status_connected;
|
||||||
|
#{} ->
|
||||||
|
?status_disconnected
|
||||||
|
end.
|
||||||
|
|
||||||
spawn_counter_process(Name, Register) ->
|
spawn_counter_process(Name, Register) ->
|
||||||
Pid = spawn_link(?MODULE, counter_loop, []),
|
Pid = spawn_link(?MODULE, counter_loop, []),
|
||||||
true = maybe_register(Name, Pid, Register),
|
true = maybe_register(Name, Pid, Register),
|
||||||
|
@ -455,3 +488,11 @@ make_random_reply(N) ->
|
||||||
3 ->
|
3 ->
|
||||||
{error, {unrecoverable_error, N}}
|
{error, {unrecoverable_error, N}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_add_channel(ConnResId, ChanId, ChanCfg) ->
|
||||||
|
Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []),
|
||||||
|
persistent_term:put(?PT_CHAN_KEY(ConnResId), [{ChanId, ChanCfg} | Chans]).
|
||||||
|
|
||||||
|
do_remove_channel(ConnResId, ChanId) ->
|
||||||
|
Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []),
|
||||||
|
persistent_term:put(?PT_CHAN_KEY(ConnResId), proplists:delete(ChanId, Chans)).
|
||||||
|
|
|
@ -3141,6 +3141,55 @@ t_non_blocking_resource_health_check(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_non_blocking_channel_health_check(_Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
{ok, _} =
|
||||||
|
create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource, health_check_error => {delay, 500}},
|
||||||
|
#{health_check_interval => 100}
|
||||||
|
),
|
||||||
|
ChanId = <<"chan">>,
|
||||||
|
ok =
|
||||||
|
emqx_resource_manager:add_channel(
|
||||||
|
?ID,
|
||||||
|
ChanId,
|
||||||
|
#{health_check_delay => 500}
|
||||||
|
),
|
||||||
|
|
||||||
|
%% concurrently attempt to health check the resource; should do it only once
|
||||||
|
%% for all callers
|
||||||
|
NumCallers = 20,
|
||||||
|
Expected = lists:duplicate(
|
||||||
|
NumCallers,
|
||||||
|
#{error => undefined, status => connected}
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
Expected,
|
||||||
|
emqx_utils:pmap(
|
||||||
|
fun(_) -> emqx_resource_manager:channel_health_check(?ID, ChanId) end,
|
||||||
|
lists:seq(1, NumCallers)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
NumCallers
|
||||||
|
end,
|
||||||
|
[
|
||||||
|
log_consistency_prop(),
|
||||||
|
fun(NumCallers, Trace) ->
|
||||||
|
%% shouldn't have one health check per caller
|
||||||
|
SubTrace = ?of_kind(connector_demo_channel_health_check_delay, Trace),
|
||||||
|
?assertMatch([_ | _], SubTrace),
|
||||||
|
?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Made channel (action/source) health checks non-blocking operations. This means that operations such as updating or removing an action/source data integration won't be blocked by a lengthy running health check.
|
Loading…
Reference in New Issue