diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1ccb5ca71..2e72c2a28 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -265,7 +265,7 @@ query(ResId, Request, Opts) -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> - %% only Kafka so far + %% only Kafka producer so far Opts1 = Opts#{is_buffer_supported => true}, emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); {false, sync} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 648587c25..2b41218ba 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -142,7 +142,7 @@ simple_sync_query(Id, Request) -> QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -154,7 +154,7 @@ simple_async_query(Id, Request, QueryOpts0) -> QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -381,7 +381,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> } = Data0, ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, - Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), + Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> @@ -570,7 +570,7 @@ do_flush( %% unwrap when not batching (i.e., batch size == 1) [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, - Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop @@ -655,7 +655,7 @@ do_flush(#{queue := Q1} = Data0, #{ inflight_tid := InflightTID } = Data0, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, - Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts), case batch_reply_caller(Id, Result, Batch, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. @@ -887,17 +887,13 @@ handle_async_worker_down(Data0, Pid) -> mark_inflight_items_as_retriable(Data, WorkerMRef), {keep_state, Data}. -call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> - ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), +-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. +call_query(QM, Id, Index, Ref, Query, QueryOpts) -> + ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}), case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, Resource} -> - QM = - case QM0 =:= configured of - true -> maps:get(query_mode, Resource); - false -> QM0 - end, do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -1515,9 +1511,9 @@ inc_sent_success(Id, _HasBeenSent = true) -> inc_sent_success(Id, _HasBeenSent) -> emqx_resource_metrics:success_inc(Id). -call_mode(sync, _) -> sync; -call_mode(async, always_sync) -> sync; -call_mode(async, async_if_possible) -> async. +call_mode(force_sync, _) -> sync; +call_mode(async_if_possible, always_sync) -> sync; +call_mode(async_if_possible, async_if_possible) -> async. assert_ok_result(ok) -> true; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index a863dbb78..a1393c574 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -146,6 +146,12 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> {error, timeout} end. +on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) -> + Pid ! {block, ReplyFun}, + {ok, Pid}; +on_query_async(_InstId, resume, ReplyFun, #{pid := Pid}) -> + Pid ! {resume, ReplyFun}, + {ok, Pid}; on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> Pid ! {inc, N, ReplyFun}, {ok, Pid}; @@ -274,6 +280,10 @@ counter_loop( block -> ct:pal("counter recv: ~p", [block]), State#{status => blocked}; + {block, ReplyFun} -> + ct:pal("counter recv: ~p", [block]), + apply_reply(ReplyFun, ok), + State#{status => blocked}; {block_now, ReplyFun} -> ct:pal("counter recv: ~p", [block_now]), apply_reply( @@ -284,6 +294,11 @@ counter_loop( {messages, Msgs} = erlang:process_info(self(), messages), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), State#{status => running}; + {resume, ReplyFun} -> + {messages, Msgs} = erlang:process_info(self(), messages), + ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), + apply_reply(ReplyFun, ok), + State#{status => running}; {inc, N, ReplyFun} when Status == running -> %ct:pal("async counter recv: ~p", [{inc, N}]), apply_reply(ReplyFun, ok), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e7c252fa9..ca91ae40d 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2561,6 +2561,84 @@ do_t_recursive_flush() -> ), ok. +t_call_mode_uncoupled_from_query_mode(_Config) -> + DefaultOpts = #{ + batch_size => 1, + batch_time => 5, + worker_pool_size => 1 + }, + ?check_trace( + begin + %% We check that we can call the buffer workers with async + %% calls, even if the underlying connector itself only + %% supports sync calls. + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{query_mode => async} + ), + ?tp_span( + async_query_sync_driver, + #{}, + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 1}), + #{?snk_kind := buffer_worker_flush_ack}, + 500 + ) + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + + %% And we check the converse: a connector that allows async + %% calls can be called synchronously, but the underlying + %% call should be async. + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{query_mode => sync} + ), + ?tp_span( + sync_query_async_driver, + #{}, + ?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2})) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + ?tp(sync_query_async_driver, #{}), + ok + end, + fun(Trace0) -> + Trace1 = trace_between_span(Trace0, async_query_sync_driver), + ct:pal("async query calling sync driver\n ~p", [Trace1]), + ?assert( + ?strict_causality( + #{?snk_kind := async_query, request := {inc_counter, 1}}, + #{?snk_kind := call_query, call_mode := sync}, + Trace1 + ) + ), + + Trace2 = trace_between_span(Trace0, sync_query_async_driver), + ct:pal("sync query calling async driver\n ~p", [Trace2]), + ?assert( + ?strict_causality( + #{?snk_kind := sync_query, request := {inc_counter, 2}}, + #{?snk_kind := call_query_async}, + Trace2 + ) + ), + + ok + end + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -2742,3 +2820,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. + +trace_between_span(Trace0, Marker) -> + {Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0), + {[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1), + Trace2. diff --git a/changes/ce/feat-10206.en.md b/changes/ce/feat-10206.en.md new file mode 100644 index 000000000..014ea71f2 --- /dev/null +++ b/changes/ce/feat-10206.en.md @@ -0,0 +1,7 @@ +Decouple the query mode from the underlying call mode for buffer +workers. + +Prior to this change, setting the query mode of a resource +such as a bridge to `sync` would force the buffer to call the +underlying connector in a synchronous way, even if it supports async +calls. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 183002e61..c0d58c4f7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, _}}], Trace), + ?assertMatch([#{result := ok}], Trace), ok end ), @@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, _}}], Trace), + ?assertMatch([#{result := ok}], Trace), ok end ), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index f9968ee96..75d2d2d8c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -1023,7 +1023,6 @@ t_publish_timeout(Config) -> do_econnrefused_or_timeout_test(Config, timeout). do_econnrefused_or_timeout_test(Config, Error) -> - QueryMode = ?config(query_mode, Config), ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, @@ -1031,15 +1030,8 @@ do_econnrefused_or_timeout_test(Config, Error) -> Message = emqx_message:make(Topic, Payload), ?check_trace( begin - case {QueryMode, Error} of - {sync, _} -> - {_, {ok, _}} = - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_request_failed, recoverable_error := true}, - 15_000 - ); - {async, econnrefused} -> + case Error of + econnrefused -> %% at the time of writing, async requests %% are never considered expired by ehttpc %% (even if they arrive late, or never @@ -1059,7 +1051,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> }, 15_000 ); - {async, timeout} -> + timeout -> %% at the time of writing, async requests %% are never considered expired by ehttpc %% (even if they arrive late, or never @@ -1077,18 +1069,13 @@ do_econnrefused_or_timeout_test(Config, Error) -> end end, fun(Trace) -> - case {QueryMode, Error} of - {sync, _} -> + case Error of + econnrefused -> ?assertMatch( [#{reason := Error, connector := ResourceId} | _], ?of_kind(gcp_pubsub_request_failed, Trace) ); - {async, econnrefused} -> - ?assertMatch( - [#{reason := Error, connector := ResourceId} | _], - ?of_kind(gcp_pubsub_request_failed, Trace) - ); - {async, timeout} -> + timeout -> ?assertMatch( [_, _ | _], ?of_kind(gcp_pubsub_response, Trace) @@ -1098,11 +1085,11 @@ do_econnrefused_or_timeout_test(Config, Error) -> end ), - case {Error, QueryMode} of + case Error of %% apparently, async with disabled queue doesn't mark the %% message as dropped; and since it never considers the %% response expired, this succeeds. - {econnrefused, async} -> + econnrefused -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ timeout => 10_000, n_events => 1 }), @@ -1124,7 +1111,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> } when Matched >= 1 andalso Inflight + Queueing + Dropped + Failed =< 2, CurrentMetrics ); - {timeout, async} -> + timeout -> wait_until_gauge_is(inflight, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400), assert_metrics( @@ -1139,21 +1126,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> late_reply => 2 }, ResourceId - ); - {_, sync} -> - wait_until_gauge_is(queuing, 0, 500), - wait_until_gauge_is(inflight, 1, 500), - assert_metrics( - #{ - dropped => 0, - failed => 0, - inflight => 1, - matched => 1, - queuing => 0, - retried => 0, - success => 0 - }, - ResourceId ) end, @@ -1277,7 +1249,6 @@ t_failure_no_body(Config) -> t_unrecoverable_error(Config) -> ResourceId = ?config(resource_id, Config), - QueryMode = ?config(query_mode, Config), TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> @@ -1308,33 +1279,16 @@ t_unrecoverable_error(Config) -> Message = emqx_message:make(Topic, Payload), ?check_trace( {_, {ok, _}} = - case QueryMode of - sync -> - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_request_failed}, - 5_000 - ); - async -> - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_response}, - 5_000 - ) - end, + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ), fun(Trace) -> - case QueryMode of - sync -> - ?assertMatch( - [#{reason := killed}], - ?of_kind(gcp_pubsub_request_failed, Trace) - ); - async -> - ?assertMatch( - [#{response := {error, killed}}], - ?of_kind(gcp_pubsub_response, Trace) - ) - end, + ?assertMatch( + [#{response := {error, killed}}], + ?of_kind(gcp_pubsub_response, Trace) + ), ok end ), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 2b2214df0..e8dd970f3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -532,10 +532,12 @@ t_start_ok(Config) -> }, ?check_trace( begin - ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of - async -> ct:sleep(500); - sync -> ok + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, PersistedData = query_by_clientid(ClientId, Config), Expected = #{ @@ -689,10 +691,12 @@ t_const_timestamp(Config) -> <<"payload">> => Payload, <<"timestamp">> => erlang:system_time(millisecond) }, - ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of - async -> ct:sleep(500); - sync -> ok + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, PersistedData = query_by_clientid(ClientId, Config), Expected = #{foo => <<"123">>}, @@ -745,7 +749,12 @@ t_boolean_variants(Config) -> <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, - ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)); + async -> + ?assertMatch(ok, send_message(Config, SentData)) + end, case QueryMode of async -> ct:sleep(500); sync -> ok @@ -841,10 +850,9 @@ t_bad_timestamp(Config) -> ); {sync, false} -> ?assertEqual( - {error, - {unrecoverable_error, [ - {error, {bad_timestamp, <<"bad_timestamp">>}} - ]}}, + {error, [ + {error, {bad_timestamp, <<"bad_timestamp">>}} + ]}, Return ); {sync, true} -> @@ -964,7 +972,7 @@ t_write_failure(Config) -> {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) ), - #{?snk_kind := buffer_worker_flush_nack}, + #{?snk_kind := handle_async_reply, action := nack}, 1_000 ); async -> @@ -978,13 +986,13 @@ t_write_failure(Config) -> fun(Trace0) -> case QueryMode of sync -> - Trace = ?of_kind(buffer_worker_flush_nack, Trace0), + Trace = ?of_kind(handle_async_reply, Trace0), ?assertMatch([_ | _], Trace), [#{result := Result} | _] = Trace, ?assert( {error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, closed}} =:= Result orelse - {error, {recoverable_error, {error, econnrefused}}} =:= Result, + {error, {recoverable_error, econnrefused}} =:= Result, #{got => Result} ); async -> @@ -1006,7 +1014,6 @@ t_write_failure(Config) -> ok. t_missing_field(Config) -> - QueryMode = ?config(query_mode, Config), BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, {ok, _} = @@ -1034,8 +1041,7 @@ t_missing_field(Config) -> {ok, _} = snabbkaffe:block_until( ?match_n_events(NEvents, #{ - ?snk_kind := influxdb_connector_send_query_error, - mode := QueryMode + ?snk_kind := influxdb_connector_send_query_error }), _Timeout1 = 10_000 ), diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl index 72fc11a67..364821ea0 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl @@ -94,7 +94,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected - ?assertMatch(ok, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -116,7 +116,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch(ok, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),