fix: don't send message to channels that are not operational

This commit is contained in:
Kjell Winblad 2023-10-26 11:51:40 +02:00 committed by Zaiming (Stone) Shi
parent bba5b42c99
commit 5aac90ab4e
2 changed files with 28 additions and 61 deletions

View File

@ -80,14 +80,6 @@ on_query(
Channels = maps:get(channels, ConnectorState, #{}), Channels = maps:get(channels, ConnectorState, #{}),
%% Lookup the channel %% Lookup the channel
ChannelState = maps:get(ChannelId, Channels, not_found), ChannelState = maps:get(ChannelId, Channels, not_found),
case ChannelState of
not_found ->
error(
{recoverable_error, <<"Unexpected type for batch message (Expected send_message)">>}
);
_ ->
ok
end,
SendTo = maps:get(send_to, ChannelState), SendTo = maps:get(send_to, ChannelState),
SendTo ! Message, SendTo ! Message,
ok. ok.

View File

@ -1107,54 +1107,29 @@ extract_connector_id(Id) ->
is_channel_id(Id) -> is_channel_id(Id) ->
extract_connector_id(Id) =/= Id. extract_connector_id(Id) =/= Id.
%% Check if channel is installed in the connector state so that we %% Check if channel is installed in the connector state.
%% can add it if it's not installed. We will fail with a recoverable error if %% There is no need to query the conncector if the channel is not
%% the installation fails so that the query can be retried. %% installed as the query will fail anyway.
pre_query_channel_check({Id, _} = _Request, State, Channels, _Mod) when is_map_key(Id, Channels) -> pre_query_channel_check({Id, _} = _Request, Channels) when
State; is_map_key(Id, Channels),
pre_query_channel_check({Id, _} = _Request, State, _Channels, Mod) -> (map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting)
->
ok;
pre_query_channel_check({Id, _} = _Request, _Channels) ->
%% Fail with a recoverable error if the channel is not installed
%% so that the operation can be retried. It is emqx_resource_manager's
%% responsibility to ensure that the channel installation is retried.
case is_channel_id(Id) of case is_channel_id(Id) of
true -> true ->
ResId = extract_connector_id(Id), error(
case emqx_resource:call_get_channel_config(ResId, Id, Mod) of
ChannelConfig when is_map(ChannelConfig) ->
add_channel(ResId, Id, ChannelConfig);
Error ->
%% Broken reference: this should not happen
erlang:error({unrecoverable_error, Error})
end;
false ->
State
end;
pre_query_channel_check(_Request, State, _Channels, _Mod) ->
State.
add_channel(ResId, ChannelId, ChannelConfig) ->
case emqx_resource_manager:add_channel(ResId, ChannelId, ChannelConfig) of
ok ->
read_new_state_from_resource_cache(ResId);
{error, Reason} ->
erlang:error(
{recoverable_error, {recoverable_error,
iolist_to_binary( iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}
io_lib:format("channel:~p could not be installed in its connector: (~p)", [ );
ChannelId, Reason false ->
]) ok
)} end;
) pre_query_channel_check(_Request, _Channels) ->
end. ok.
read_new_state_from_resource_cache(ResId) ->
case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{status := stopped}} ->
error({recoverable_error, <<"resource stopped or disabled">>});
{ok, _Group, #{status := connecting, error := unhealthy_target}} ->
error({unrecoverable_error, unhealthy_target});
{ok, _Group, #{state := State}} ->
State;
{error, not_found} ->
error({recoverable_error, <<"resource not found">>})
end.
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
@ -1207,8 +1182,8 @@ apply_query_fun(
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query, call_query,
begin begin
NewResSt = pre_query_channel_check(Request, ResSt, Channels, Mod), pre_query_channel_check(Request, Channels),
Mod:on_query(extract_connector_id(Id), Request, NewResSt) Mod:on_query(extract_connector_id(Id), Request, ResSt)
end, end,
Request Request
), ),
@ -1238,9 +1213,9 @@ apply_query_fun(
AsyncWorkerMRef = undefined, AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem), ok = inflight_append(InflightTID, InflightItem),
NewResSt = pre_query_channel_check(Request, ResSt, Channels, Mod), pre_query_channel_check(Request, Channels),
Result = Mod:on_query_async( Result = Mod:on_query_async(
extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, NewResSt extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
), ),
{async_return, Result} {async_return, Result}
end, end,
@ -1265,8 +1240,8 @@ apply_query_fun(
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query, call_batch_query,
begin begin
NewResSt = pre_query_channel_check(FirstRequest, ResSt, Channels, Mod), pre_query_channel_check(FirstRequest, Channels),
Mod:on_batch_query(extract_connector_id(Id), Requests, NewResSt) Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt)
end, end,
Batch Batch
), ),
@ -1307,9 +1282,9 @@ apply_query_fun(
AsyncWorkerMRef = undefined, AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem), ok = inflight_append(InflightTID, InflightItem),
NewResSt = pre_query_channel_check(FirstRequest, ResSt, Channels, Mod), pre_query_channel_check(FirstRequest, Channels),
Result = Mod:on_batch_query_async( Result = Mod:on_batch_query_async(
extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, NewResSt extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
), ),
{async_return, Result} {async_return, Result}
end, end,