fix(actions): increment rule statistics even if channel is not installed

Fixes new bug posted after https://emqx.atlassian.net/browse/EMQX-11494 was already fixed.

Also reduces the usage of error throwing for flow control a bit.
This commit is contained in:
Thales Macedo Garitezi 2023-12-11 12:06:38 -03:00
parent b6c11be159
commit 2495f59c91
2 changed files with 129 additions and 25 deletions

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -343,7 +344,7 @@ t_send_message_through_rule(_) ->
BridgeName = my_test_bridge, BridgeName = my_test_bridge,
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()), {ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
%% Create a rule to send message to the bridge %% Create a rule to send message to the bridge
{ok, _} = emqx_rule_engine:create_rule( {ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
#{ #{
sql => <<"select * from \"t/a\"">>, sql => <<"select * from \"t/a\"">>,
id => atom_to_binary(?FUNCTION_NAME), id => atom_to_binary(?FUNCTION_NAME),
@ -357,6 +358,7 @@ t_send_message_through_rule(_) ->
description => <<"bridge_v2 test rule">> description => <<"bridge_v2 test rule">>
} }
), ),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
%% Register name for this process %% Register name for this process
register(registered_process_name(), self()), register(registered_process_name(), self()),
%% Send message to the topic %% Send message to the topic
@ -371,7 +373,6 @@ t_send_message_through_rule(_) ->
ct:fail("Failed to receive message") ct:fail("Failed to receive message")
end, end,
unregister(registered_process_name()), unregister(registered_process_name()),
ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)),
ok = emqx_bridge_v2:remove(bridge_type(), BridgeName), ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
ok. ok.
@ -894,6 +895,95 @@ t_lookup_status_when_connecting(_Config) ->
?assertMatch(#{status := ?status_disconnected}, ChannelData), ?assertMatch(#{status := ?status_disconnected}, ChannelData),
ok. ok.
t_rule_pointing_to_non_operational_channel(_Config) ->
%% Check that, if a rule sends a message to an action that is not yet installed and
%% uses `simple_async_internal_buffer', then it eventually increments the rule's
%% failed counter.
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_get_status_fun">> => OnGetStatusFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConnectorName = ?FUNCTION_NAME,
ct:pal("connector config:\n ~p", [ConnectorConfig]),
?check_trace(
begin
%% FIXME: this should only matter for the action. yet, currently the query
%% mode from the connector is stored once by the resource manager and later
%% used to decide how to call the resource...
meck:new(con_mod(), [passthrough, no_history, non_strict]),
on_exit(fun() -> catch meck:unload([con_mod()]) end),
meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
meck:expect(con_mod(), callback_mode, 0, async_if_possible),
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
ActionName = my_test_action,
ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
ActionConfig = (bridge_config())#{
<<"on_get_channel_status_fun">> => ChanStatusFun,
<<"connector">> => atom_to_binary(ConnectorName)
},
ct:pal("action config:\n ~p", [ActionConfig]),
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
?assertMatch(
{ok, #{
error := <<"Not installed">>,
status := ?status_connecting,
resource_data := #{status := ?status_connecting}
}},
emqx_bridge_v2:lookup(bridge_type(), ActionName)
),
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
#{
sql => <<"select * from \"t/a\"">>,
id => atom_to_binary(?FUNCTION_NAME),
actions => [
<<
(atom_to_binary(bridge_type()))/binary,
":",
(atom_to_binary(ActionName))/binary
>>
]
}
),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
emqx:publish(Msg),
ActionId = emqx_bridge_v2:id(bridge_type(), ActionName, ConnectorName),
?assertEqual(1, emqx_resource_metrics:matched_get(ActionId)),
?assertEqual(1, emqx_resource_metrics:failed_get(ActionId)),
?retry(
_Sleep0 = 100,
_Attempts = 20,
?assertMatch(
#{
counters :=
#{
matched := 1,
'actions.failed' := 1
}
},
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
)
),
ok
end,
[]
),
ok.
%% Helper Functions %% Helper Functions
wait_until(Fun) -> wait_until(Fun) ->

View File

@ -1122,14 +1122,14 @@ pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
true -> true ->
ok; ok;
false -> false ->
maybe_throw_channel_not_installed(Id, QueryOpts) error_if_channel_is_not_installed(Id, QueryOpts)
end; end;
pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) -> pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
maybe_throw_channel_not_installed(Id, QueryOpts); error_if_channel_is_not_installed(Id, QueryOpts);
pre_query_channel_check(_Request, _Channels, _QueryOpts) -> pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
ok. ok.
maybe_throw_channel_not_installed(Id, QueryOpts) -> error_if_channel_is_not_installed(Id, QueryOpts) ->
%% Fail with a recoverable error if the channel is not installed and there are buffer %% Fail with a recoverable error if the channel is not installed and there are buffer
%% workers involved so that the operation can be retried. Otherwise, this is %% workers involved so that the operation can be retried. Otherwise, this is
%% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the %% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the
@ -1137,15 +1137,13 @@ maybe_throw_channel_not_installed(Id, QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
case is_channel_id(Id) of case is_channel_id(Id) of
true when IsSimpleQuery -> true when IsSimpleQuery ->
error( {error,
{unrecoverable_error, {unrecoverable_error,
iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
);
true -> true ->
error( {error,
{recoverable_error, {recoverable_error,
iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
);
false -> false ->
ok ok
end. end.
@ -1201,8 +1199,12 @@ apply_query_fun(
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query, call_query,
begin begin
pre_query_channel_check(Request, Channels, QueryOpts), case pre_query_channel_check(Request, Channels, QueryOpts) of
Mod:on_query(extract_connector_id(Id), Request, ResSt) ok ->
Mod:on_query(extract_connector_id(Id), Request, ResSt);
Error ->
Error
end
end, end,
Request Request
), ),
@ -1232,11 +1234,15 @@ apply_query_fun(
AsyncWorkerMRef = undefined, AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem), ok = inflight_append(InflightTID, InflightItem),
pre_query_channel_check(Request, Channels, QueryOpts), case pre_query_channel_check(Request, Channels, QueryOpts) of
ok ->
Result = Mod:on_query_async( Result = Mod:on_query_async(
extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt
), ),
{async_return, Result} {async_return, Result};
Error ->
maybe_reply_to(Error, QueryOpts)
end
end, end,
Request Request
); );
@ -1259,8 +1265,12 @@ apply_query_fun(
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query, call_batch_query,
begin begin
pre_query_channel_check(FirstRequest, Channels, QueryOpts), case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt) ok ->
Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
Error ->
Error
end
end, end,
Batch Batch
), ),
@ -1301,11 +1311,15 @@ apply_query_fun(
AsyncWorkerMRef = undefined, AsyncWorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
ok = inflight_append(InflightTID, InflightItem), ok = inflight_append(InflightTID, InflightItem),
pre_query_channel_check(FirstRequest, Channels, QueryOpts), case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
ok ->
Result = Mod:on_batch_query_async( Result = Mod:on_batch_query_async(
extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt
), ),
{async_return, Result} {async_return, Result};
Error ->
maybe_reply_to(Error, QueryOpts)
end
end, end,
Batch Batch
). ).