From 2495f59c9174e4a458d8581224f90cb8e20498a1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 11 Dec 2023 12:06:38 -0300 Subject: [PATCH 1/2] fix(actions): increment rule statistics even if channel is not installed Fixes new bug posted after https://emqx.atlassian.net/browse/EMQX-11494 was already fixed. Also reduces the usage of error throwing for flow control a bit. --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 94 ++++++++++++++++++- .../src/emqx_resource_buffer_worker.erl | 60 +++++++----- 2 files changed, 129 insertions(+), 25 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 791997fc3..ce461427b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -343,7 +344,7 @@ t_send_message_through_rule(_) -> BridgeName = my_test_bridge, {ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()), %% Create a rule to send message to the bridge - {ok, _} = emqx_rule_engine:create_rule( + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( #{ sql => <<"select * from \"t/a\"">>, id => atom_to_binary(?FUNCTION_NAME), @@ -357,6 +358,7 @@ t_send_message_through_rule(_) -> description => <<"bridge_v2 test rule">> } ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), %% Register name for this process register(registered_process_name(), self()), %% Send message to the topic @@ -371,7 +373,6 @@ t_send_message_through_rule(_) -> ct:fail("Failed to receive message") end, unregister(registered_process_name()), - ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)), ok = emqx_bridge_v2:remove(bridge_type(), BridgeName), ok. @@ -894,6 +895,95 @@ t_lookup_status_when_connecting(_Config) -> ?assertMatch(#{status := ?status_disconnected}, ChannelData), ok. +t_rule_pointing_to_non_operational_channel(_Config) -> + %% Check that, if a rule sends a message to an action that is not yet installed and + %% uses `simple_async_internal_buffer', then it eventually increments the rule's + %% failed counter. + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_get_status_fun">> => OnGetStatusFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + ?check_trace( + begin + %% FIXME: this should only matter for the action. yet, currently the query + %% mode from the connector is stored once by the resource manager and later + %% used to decide how to call the resource... + meck:new(con_mod(), [passthrough, no_history, non_strict]), + on_exit(fun() -> catch meck:unload([con_mod()]) end), + meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), callback_mode, 0, async_if_possible), + + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end), + ActionConfig = (bridge_config())#{ + <<"on_get_channel_status_fun">> => ChanStatusFun, + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + ?assertMatch( + {ok, #{ + error := <<"Not installed">>, + status := ?status_connecting, + resource_data := #{status := ?status_connecting} + }}, + emqx_bridge_v2:lookup(bridge_type(), ActionName) + ), + + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"t/a\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + << + (atom_to_binary(bridge_type()))/binary, + ":", + (atom_to_binary(ActionName))/binary + >> + ] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + + Msg = emqx_message:make(<<"t/a">>, <<"payload">>), + emqx:publish(Msg), + + ActionId = emqx_bridge_v2:id(bridge_type(), ActionName, ConnectorName), + ?assertEqual(1, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(1, emqx_resource_metrics:failed_get(ActionId)), + ?retry( + _Sleep0 = 100, + _Attempts = 20, + ?assertMatch( + #{ + counters := + #{ + matched := 1, + 'actions.failed' := 1 + } + }, + emqx_metrics_worker:get_metrics(rule_metrics, RuleId) + ) + ), + + ok + end, + [] + ), + + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 98e1a785e..9c439206a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1122,14 +1122,14 @@ pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when true -> ok; false -> - maybe_throw_channel_not_installed(Id, QueryOpts) + error_if_channel_is_not_installed(Id, QueryOpts) end; pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) -> - maybe_throw_channel_not_installed(Id, QueryOpts); + error_if_channel_is_not_installed(Id, QueryOpts); pre_query_channel_check(_Request, _Channels, _QueryOpts) -> ok. -maybe_throw_channel_not_installed(Id, QueryOpts) -> +error_if_channel_is_not_installed(Id, QueryOpts) -> %% Fail with a recoverable error if the channel is not installed and there are buffer %% workers involved so that the operation can be retried. Otherwise, this is %% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the @@ -1137,15 +1137,13 @@ maybe_throw_channel_not_installed(Id, QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), case is_channel_id(Id) of true when IsSimpleQuery -> - error( + {error, {unrecoverable_error, - iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} - ); + iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}}; true -> - error( + {error, {recoverable_error, - iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} - ); + iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}}; false -> ok end. @@ -1201,8 +1199,12 @@ apply_query_fun( ?APPLY_RESOURCE( call_query, begin - pre_query_channel_check(Request, Channels, QueryOpts), - Mod:on_query(extract_connector_id(Id), Request, ResSt) + case pre_query_channel_check(Request, Channels, QueryOpts) of + ok -> + Mod:on_query(extract_connector_id(Id), Request, ResSt); + Error -> + Error + end end, Request ), @@ -1232,11 +1234,15 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - pre_query_channel_check(Request, Channels, QueryOpts), - Result = Mod:on_query_async( - extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result} + case pre_query_channel_check(Request, Channels, QueryOpts) of + ok -> + Result = Mod:on_query_async( + extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt + ), + {async_return, Result}; + Error -> + maybe_reply_to(Error, QueryOpts) + end end, Request ); @@ -1259,8 +1265,12 @@ apply_query_fun( ?APPLY_RESOURCE( call_batch_query, begin - pre_query_channel_check(FirstRequest, Channels, QueryOpts), - Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt) + case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of + ok -> + Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt); + Error -> + Error + end end, Batch ), @@ -1301,11 +1311,15 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - pre_query_channel_check(FirstRequest, Channels, QueryOpts), - Result = Mod:on_batch_query_async( - extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result} + case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of + ok -> + Result = Mod:on_batch_query_async( + extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt + ), + {async_return, Result}; + Error -> + maybe_reply_to(Error, QueryOpts) + end end, Batch ). From b4a5c141add5646a9e2d8f0a3df2001e761672c7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 11 Dec 2023 14:06:01 -0300 Subject: [PATCH 2/2] fix(actions): use action query mode instead of connector's query mode --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 80 +++++++++++++++++-- .../src/emqx_resource_buffer_worker.erl | 8 ++ 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index ce461427b..bb0334bea 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -913,14 +913,6 @@ t_rule_pointing_to_non_operational_channel(_Config) -> ct:pal("connector config:\n ~p", [ConnectorConfig]), ?check_trace( begin - %% FIXME: this should only matter for the action. yet, currently the query - %% mode from the connector is stored once by the resource manager and later - %% used to decide how to call the resource... - meck:new(con_mod(), [passthrough, no_history, non_strict]), - on_exit(fun() -> catch meck:unload([con_mod()]) end), - meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), - meck:expect(con_mod(), callback_mode, 0, async_if_possible), - {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), ActionName = my_test_action, @@ -930,6 +922,12 @@ t_rule_pointing_to_non_operational_channel(_Config) -> <<"connector">> => atom_to_binary(ConnectorName) }, ct:pal("action config:\n ~p", [ActionConfig]), + + meck:new(con_mod(), [passthrough, no_history, non_strict]), + on_exit(fun() -> catch meck:unload([con_mod()]) end), + meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), callback_mode, 0, async_if_possible), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), ?assertMatch( @@ -984,6 +982,72 @@ t_rule_pointing_to_non_operational_channel(_Config) -> ok. +t_query_uses_action_query_mode(_Config) -> + %% Check that we compute the query mode from the action and not from the connector + %% when querying the resource. + + %% Set one query mode for the connector... + meck:new(con_mod(), [passthrough, no_history, non_strict]), + on_exit(fun() -> catch meck:unload([con_mod()]) end), + meck:expect(con_mod(), query_mode, 1, sync), + meck:expect(con_mod(), callback_mode, 0, always_sync), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + ?check_trace( + begin + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ActionConfig = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + + %% ... now we use a quite different query mode for the action + meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), callback_mode, 0, async_if_possible), + + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"t/a\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + << + (atom_to_binary(bridge_type()))/binary, + ":", + (atom_to_binary(ActionName))/binary + >> + ] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + + Msg = emqx_message:make(<<"t/a">>, <<"payload">>), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Msg), + #{?snk_kind := call_query}, + 2_000 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{query_mode := simple_async_internal_buffer}], + ?of_kind(simple_query_override, Trace) + ), + ok + end + ), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9c439206a..0d3b9cf97 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1148,6 +1148,14 @@ error_if_channel_is_not_installed(Id, QueryOpts) -> ok end. +do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when + ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer +-> + %% The query overrides the query mode of the resource, send even in disconnected state + ?tp(simple_query_override, #{query_mode => ReqQM}), + #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer ->