From 5aac90ab4e808d650091bd3a2f2ea5f40eaf6394 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 26 Oct 2023 11:51:40 +0200 Subject: [PATCH] fix: don't send message to channels that are not operational --- .../test/emqx_bridge_v2_test_connector.erl | 8 -- .../src/emqx_resource_buffer_worker.erl | 81 +++++++------------ 2 files changed, 28 insertions(+), 61 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index fd6e5698b..0e727f720 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -80,14 +80,6 @@ on_query( Channels = maps:get(channels, ConnectorState, #{}), %% Lookup the channel ChannelState = maps:get(ChannelId, Channels, not_found), - case ChannelState of - not_found -> - error( - {recoverable_error, <<"Unexpected type for batch message (Expected send_message)">>} - ); - _ -> - ok - end, SendTo = maps:get(send_to, ChannelState), SendTo ! Message, ok. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 95fcd388d..9b5ee2151 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1107,54 +1107,29 @@ extract_connector_id(Id) -> is_channel_id(Id) -> extract_connector_id(Id) =/= Id. -%% Check if channel is installed in the connector state so that we -%% can add it if it's not installed. We will fail with a recoverable error if -%% the installation fails so that the query can be retried. -pre_query_channel_check({Id, _} = _Request, State, Channels, _Mod) when is_map_key(Id, Channels) -> - State; -pre_query_channel_check({Id, _} = _Request, State, _Channels, Mod) -> +%% Check if channel is installed in the connector state. +%% There is no need to query the conncector if the channel is not +%% installed as the query will fail anyway. +pre_query_channel_check({Id, _} = _Request, Channels) when + is_map_key(Id, Channels), + (map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting) +-> + ok; +pre_query_channel_check({Id, _} = _Request, _Channels) -> + %% Fail with a recoverable error if the channel is not installed + %% so that the operation can be retried. It is emqx_resource_manager's + %% responsibility to ensure that the channel installation is retried. case is_channel_id(Id) of true -> - ResId = extract_connector_id(Id), - case emqx_resource:call_get_channel_config(ResId, Id, Mod) of - ChannelConfig when is_map(ChannelConfig) -> - add_channel(ResId, Id, ChannelConfig); - Error -> - %% Broken reference: this should not happen - erlang:error({unrecoverable_error, Error}) - end; - false -> - State - end; -pre_query_channel_check(_Request, State, _Channels, _Mod) -> - State. - -add_channel(ResId, ChannelId, ChannelConfig) -> - case emqx_resource_manager:add_channel(ResId, ChannelId, ChannelConfig) of - ok -> - read_new_state_from_resource_cache(ResId); - {error, Reason} -> - erlang:error( + error( {recoverable_error, - iolist_to_binary( - io_lib:format("channel:~p could not be installed in its connector: (~p)", [ - ChannelId, Reason - ]) - )} - ) - end. - -read_new_state_from_resource_cache(ResId) -> - case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{status := stopped}} -> - error({recoverable_error, <<"resource stopped or disabled">>}); - {ok, _Group, #{status := connecting, error := unhealthy_target}} -> - error({unrecoverable_error, unhealthy_target}); - {ok, _Group, #{state := State}} -> - State; - {error, not_found} -> - error({recoverable_error, <<"resource not found">>}) - end. + iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} + ); + false -> + ok + end; +pre_query_channel_check(_Request, _Channels) -> + ok. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer @@ -1207,8 +1182,8 @@ apply_query_fun( ?APPLY_RESOURCE( call_query, begin - NewResSt = pre_query_channel_check(Request, ResSt, Channels, Mod), - Mod:on_query(extract_connector_id(Id), Request, NewResSt) + pre_query_channel_check(Request, Channels), + Mod:on_query(extract_connector_id(Id), Request, ResSt) end, Request ), @@ -1238,9 +1213,9 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - NewResSt = pre_query_channel_check(Request, ResSt, Channels, Mod), + pre_query_channel_check(Request, Channels), Result = Mod:on_query_async( - extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, NewResSt + extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt ), {async_return, Result} end, @@ -1265,8 +1240,8 @@ apply_query_fun( ?APPLY_RESOURCE( call_batch_query, begin - NewResSt = pre_query_channel_check(FirstRequest, ResSt, Channels, Mod), - Mod:on_batch_query(extract_connector_id(Id), Requests, NewResSt) + pre_query_channel_check(FirstRequest, Channels), + Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt) end, Batch ), @@ -1307,9 +1282,9 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - NewResSt = pre_query_channel_check(FirstRequest, ResSt, Channels, Mod), + pre_query_channel_check(FirstRequest, Channels), Result = Mod:on_batch_query_async( - extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, NewResSt + extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt ), {async_return, Result} end,