diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index ce461427b..bb0334bea 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -913,14 +913,6 @@ t_rule_pointing_to_non_operational_channel(_Config) -> ct:pal("connector config:\n ~p", [ConnectorConfig]), ?check_trace( begin - %% FIXME: this should only matter for the action. yet, currently the query - %% mode from the connector is stored once by the resource manager and later - %% used to decide how to call the resource... - meck:new(con_mod(), [passthrough, no_history, non_strict]), - on_exit(fun() -> catch meck:unload([con_mod()]) end), - meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), - meck:expect(con_mod(), callback_mode, 0, async_if_possible), - {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), ActionName = my_test_action, @@ -930,6 +922,12 @@ t_rule_pointing_to_non_operational_channel(_Config) -> <<"connector">> => atom_to_binary(ConnectorName) }, ct:pal("action config:\n ~p", [ActionConfig]), + + meck:new(con_mod(), [passthrough, no_history, non_strict]), + on_exit(fun() -> catch meck:unload([con_mod()]) end), + meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), callback_mode, 0, async_if_possible), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), ?assertMatch( @@ -984,6 +982,72 @@ t_rule_pointing_to_non_operational_channel(_Config) -> ok. +t_query_uses_action_query_mode(_Config) -> + %% Check that we compute the query mode from the action and not from the connector + %% when querying the resource. + + %% Set one query mode for the connector... + meck:new(con_mod(), [passthrough, no_history, non_strict]), + on_exit(fun() -> catch meck:unload([con_mod()]) end), + meck:expect(con_mod(), query_mode, 1, sync), + meck:expect(con_mod(), callback_mode, 0, always_sync), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + ?check_trace( + begin + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ActionConfig = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + + %% ... now we use a quite different query mode for the action + meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), callback_mode, 0, async_if_possible), + + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"t/a\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + << + (atom_to_binary(bridge_type()))/binary, + ":", + (atom_to_binary(ActionName))/binary + >> + ] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + + Msg = emqx_message:make(<<"t/a">>, <<"payload">>), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Msg), + #{?snk_kind := call_query}, + 2_000 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{query_mode := simple_async_internal_buffer}], + ?of_kind(simple_query_override, Trace) + ), + ok + end + ), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9c439206a..0d3b9cf97 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1148,6 +1148,14 @@ error_if_channel_is_not_installed(Id, QueryOpts) -> ok end. +do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when + ReqQM =:= simple_sync_internal_buffer; ReqQM =:= simple_async_internal_buffer +-> + %% The query overrides the query mode of the resource, send even in disconnected state + ?tp(simple_query_override, #{query_mode => ReqQM}), + #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer ->