From f8d5d53908f71518dc3f16f6d07cc077b0fc3aea Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 11:26:15 -0300 Subject: [PATCH] feat(buffer_worker): decouple query mode from underlying connector call mode Fixes https://emqx.atlassian.net/browse/EMQX-9129 Currently, if an user configures a bridge with query mode sync, then all calls to the underlying driver/connector ("inner calls") will always be synchronous, regardless of its support for async calls. Since buffer workers always support async queries ("outer calls"), we should decouple those two call modes (inner and outer), and avoid exposing the inner call configuration to user to avoid complexity. There are two situations when we want to force synchronous calls to the underlying connector even if it supports async: 1) When using `simple_sync_query`, since we are bypassing the buffer workers; 2) When retrying the inflight window, to avoid overwhelming the driver. --- apps/emqx_resource/src/emqx_resource.erl | 2 +- .../src/emqx_resource_buffer_worker.erl | 26 +++--- .../test/emqx_connector_demo.erl | 15 ++++ .../test/emqx_resource_SUITE.erl | 83 +++++++++++++++++++ changes/ce/feat-10206.en.md | 7 ++ .../test/emqx_ee_bridge_dynamo_SUITE.erl | 4 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 82 ++++-------------- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 40 +++++---- .../test/emqx_ee_connector_influxdb_SUITE.erl | 4 +- 9 files changed, 162 insertions(+), 101 deletions(-) create mode 100644 changes/ce/feat-10206.en.md diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1ccb5ca71..2e72c2a28 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -265,7 +265,7 @@ query(ResId, Request, Opts) -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> - %% only Kafka so far + %% only Kafka producer so far Opts1 = Opts#{is_buffer_supported => true}, emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); {false, sync} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..4151cf430 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -140,7 +140,7 @@ simple_sync_query(Id, Request) -> QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -152,7 +152,7 @@ simple_async_query(Id, Request, QueryOpts0) -> QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -377,7 +377,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> } = Data0, ?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{simple_query => false}, - Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), + Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> @@ -566,7 +566,7 @@ do_flush( %% unwrap when not batching (i.e., batch size == 1) [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, - Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop @@ -651,7 +651,7 @@ do_flush(#{queue := Q1} = Data0, #{ inflight_tid := InflightTID } = Data0, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, - Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), + Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts), case batch_reply_caller(Id, Result, Batch, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. @@ -883,17 +883,13 @@ handle_async_worker_down(Data0, Pid) -> mark_inflight_items_as_retriable(Data, WorkerMRef), {keep_state, Data}. -call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> - ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), +-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. +call_query(QM, Id, Index, Ref, Query, QueryOpts) -> + ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}), case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, Resource} -> - QM = - case QM0 =:= configured of - true -> maps:get(query_mode, Resource); - false -> QM0 - end, do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -1511,9 +1507,9 @@ inc_sent_success(Id, _HasBeenSent = true) -> inc_sent_success(Id, _HasBeenSent) -> emqx_resource_metrics:success_inc(Id). -call_mode(sync, _) -> sync; -call_mode(async, always_sync) -> sync; -call_mode(async, async_if_possible) -> async. +call_mode(force_sync, _) -> sync; +call_mode(async_if_possible, always_sync) -> sync; +call_mode(async_if_possible, async_if_possible) -> async. assert_ok_result(ok) -> true; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index a863dbb78..a1393c574 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -146,6 +146,12 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> {error, timeout} end. +on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) -> + Pid ! {block, ReplyFun}, + {ok, Pid}; +on_query_async(_InstId, resume, ReplyFun, #{pid := Pid}) -> + Pid ! {resume, ReplyFun}, + {ok, Pid}; on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> Pid ! {inc, N, ReplyFun}, {ok, Pid}; @@ -274,6 +280,10 @@ counter_loop( block -> ct:pal("counter recv: ~p", [block]), State#{status => blocked}; + {block, ReplyFun} -> + ct:pal("counter recv: ~p", [block]), + apply_reply(ReplyFun, ok), + State#{status => blocked}; {block_now, ReplyFun} -> ct:pal("counter recv: ~p", [block_now]), apply_reply( @@ -284,6 +294,11 @@ counter_loop( {messages, Msgs} = erlang:process_info(self(), messages), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), State#{status => running}; + {resume, ReplyFun} -> + {messages, Msgs} = erlang:process_info(self(), messages), + ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), + apply_reply(ReplyFun, ok), + State#{status => running}; {inc, N, ReplyFun} when Status == running -> %ct:pal("async counter recv: ~p", [{inc, N}]), apply_reply(ReplyFun, ok), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e7c252fa9..ca91ae40d 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2561,6 +2561,84 @@ do_t_recursive_flush() -> ), ok. +t_call_mode_uncoupled_from_query_mode(_Config) -> + DefaultOpts = #{ + batch_size => 1, + batch_time => 5, + worker_pool_size => 1 + }, + ?check_trace( + begin + %% We check that we can call the buffer workers with async + %% calls, even if the underlying connector itself only + %% supports sync calls. + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{query_mode => async} + ), + ?tp_span( + async_query_sync_driver, + #{}, + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 1}), + #{?snk_kind := buffer_worker_flush_ack}, + 500 + ) + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + + %% And we check the converse: a connector that allows async + %% calls can be called synchronously, but the underlying + %% call should be async. + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{query_mode => sync} + ), + ?tp_span( + sync_query_async_driver, + #{}, + ?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2})) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + ?tp(sync_query_async_driver, #{}), + ok + end, + fun(Trace0) -> + Trace1 = trace_between_span(Trace0, async_query_sync_driver), + ct:pal("async query calling sync driver\n ~p", [Trace1]), + ?assert( + ?strict_causality( + #{?snk_kind := async_query, request := {inc_counter, 1}}, + #{?snk_kind := call_query, call_mode := sync}, + Trace1 + ) + ), + + Trace2 = trace_between_span(Trace0, sync_query_async_driver), + ct:pal("sync query calling async driver\n ~p", [Trace2]), + ?assert( + ?strict_causality( + #{?snk_kind := sync_query, request := {inc_counter, 2}}, + #{?snk_kind := call_query_async}, + Trace2 + ) + ), + + ok + end + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -2742,3 +2820,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. + +trace_between_span(Trace0, Marker) -> + {Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0), + {[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1), + Trace2. diff --git a/changes/ce/feat-10206.en.md b/changes/ce/feat-10206.en.md new file mode 100644 index 000000000..014ea71f2 --- /dev/null +++ b/changes/ce/feat-10206.en.md @@ -0,0 +1,7 @@ +Decouple the query mode from the underlying call mode for buffer +workers. + +Prior to this change, setting the query mode of a resource +such as a bridge to `sync` would force the buffer to call the +underlying connector in a synchronous way, even if it supports async +calls. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 183002e61..c0d58c4f7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, _}}], Trace), + ?assertMatch([#{result := ok}], Trace), ok end ), @@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) -> end, fun(Trace0) -> Trace = ?of_kind(dynamo_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, _}}], Trace), + ?assertMatch([#{result := ok}], Trace), ok end ), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 8424ddff0..709955666 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -1013,7 +1013,6 @@ t_publish_timeout(Config) -> do_econnrefused_or_timeout_test(Config, timeout). do_econnrefused_or_timeout_test(Config, Error) -> - QueryMode = ?config(query_mode, Config), ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, @@ -1021,15 +1020,8 @@ do_econnrefused_or_timeout_test(Config, Error) -> Message = emqx_message:make(Topic, Payload), ?check_trace( begin - case {QueryMode, Error} of - {sync, _} -> - {_, {ok, _}} = - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_request_failed, recoverable_error := true}, - 15_000 - ); - {async, econnrefused} -> + case Error of + econnrefused -> %% at the time of writing, async requests %% are never considered expired by ehttpc %% (even if they arrive late, or never @@ -1049,7 +1041,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> }, 15_000 ); - {async, timeout} -> + timeout -> %% at the time of writing, async requests %% are never considered expired by ehttpc %% (even if they arrive late, or never @@ -1067,18 +1059,13 @@ do_econnrefused_or_timeout_test(Config, Error) -> end end, fun(Trace) -> - case {QueryMode, Error} of - {sync, _} -> + case Error of + econnrefused -> ?assertMatch( [#{reason := Error, connector := ResourceId} | _], ?of_kind(gcp_pubsub_request_failed, Trace) ); - {async, econnrefused} -> - ?assertMatch( - [#{reason := Error, connector := ResourceId} | _], - ?of_kind(gcp_pubsub_request_failed, Trace) - ); - {async, timeout} -> + timeout -> ?assertMatch( [_, _ | _], ?of_kind(gcp_pubsub_response, Trace) @@ -1088,11 +1075,11 @@ do_econnrefused_or_timeout_test(Config, Error) -> end ), - case {Error, QueryMode} of + case Error of %% apparently, async with disabled queue doesn't mark the %% message as dropped; and since it never considers the %% response expired, this succeeds. - {econnrefused, async} -> + econnrefused -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ timeout => 10_000, n_events => 1 }), @@ -1114,7 +1101,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> } when Matched >= 1 andalso Inflight + Queueing + Dropped + Failed =< 2, CurrentMetrics ); - {timeout, async} -> + timeout -> wait_until_gauge_is(inflight, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400), assert_metrics( @@ -1129,21 +1116,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> late_reply => 2 }, ResourceId - ); - {_, sync} -> - wait_until_gauge_is(queuing, 0, 500), - wait_until_gauge_is(inflight, 1, 500), - assert_metrics( - #{ - dropped => 0, - failed => 0, - inflight => 1, - matched => 1, - queuing => 0, - retried => 0, - success => 0 - }, - ResourceId ) end, @@ -1267,7 +1239,6 @@ t_failure_no_body(Config) -> t_unrecoverable_error(Config) -> ResourceId = ?config(resource_id, Config), - QueryMode = ?config(query_mode, Config), TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> @@ -1298,33 +1269,16 @@ t_unrecoverable_error(Config) -> Message = emqx_message:make(Topic, Payload), ?check_trace( {_, {ok, _}} = - case QueryMode of - sync -> - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_request_failed}, - 5_000 - ); - async -> - ?wait_async_action( - emqx:publish(Message), - #{?snk_kind := gcp_pubsub_response}, - 5_000 - ) - end, + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ), fun(Trace) -> - case QueryMode of - sync -> - ?assertMatch( - [#{reason := killed}], - ?of_kind(gcp_pubsub_request_failed, Trace) - ); - async -> - ?assertMatch( - [#{response := {error, killed}}], - ?of_kind(gcp_pubsub_response, Trace) - ) - end, + ?assertMatch( + [#{response := {error, killed}}], + ?of_kind(gcp_pubsub_response, Trace) + ), ok end ), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 2b2214df0..e8dd970f3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -532,10 +532,12 @@ t_start_ok(Config) -> }, ?check_trace( begin - ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of - async -> ct:sleep(500); - sync -> ok + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, PersistedData = query_by_clientid(ClientId, Config), Expected = #{ @@ -689,10 +691,12 @@ t_const_timestamp(Config) -> <<"payload">> => Payload, <<"timestamp">> => erlang:system_time(millisecond) }, - ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of - async -> ct:sleep(500); - sync -> ok + async -> + ?assertMatch(ok, send_message(Config, SentData)), + ct:sleep(500); + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, PersistedData = query_by_clientid(ClientId, Config), Expected = #{foo => <<"123">>}, @@ -745,7 +749,12 @@ t_boolean_variants(Config) -> <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, - ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + sync -> + ?assertMatch({ok, 204, _}, send_message(Config, SentData)); + async -> + ?assertMatch(ok, send_message(Config, SentData)) + end, case QueryMode of async -> ct:sleep(500); sync -> ok @@ -841,10 +850,9 @@ t_bad_timestamp(Config) -> ); {sync, false} -> ?assertEqual( - {error, - {unrecoverable_error, [ - {error, {bad_timestamp, <<"bad_timestamp">>}} - ]}}, + {error, [ + {error, {bad_timestamp, <<"bad_timestamp">>}} + ]}, Return ); {sync, true} -> @@ -964,7 +972,7 @@ t_write_failure(Config) -> {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) ), - #{?snk_kind := buffer_worker_flush_nack}, + #{?snk_kind := handle_async_reply, action := nack}, 1_000 ); async -> @@ -978,13 +986,13 @@ t_write_failure(Config) -> fun(Trace0) -> case QueryMode of sync -> - Trace = ?of_kind(buffer_worker_flush_nack, Trace0), + Trace = ?of_kind(handle_async_reply, Trace0), ?assertMatch([_ | _], Trace), [#{result := Result} | _] = Trace, ?assert( {error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, closed}} =:= Result orelse - {error, {recoverable_error, {error, econnrefused}}} =:= Result, + {error, {recoverable_error, econnrefused}} =:= Result, #{got => Result} ); async -> @@ -1006,7 +1014,6 @@ t_write_failure(Config) -> ok. t_missing_field(Config) -> - QueryMode = ?config(query_mode, Config), BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, {ok, _} = @@ -1034,8 +1041,7 @@ t_missing_field(Config) -> {ok, _} = snabbkaffe:block_until( ?match_n_events(NEvents, #{ - ?snk_kind := influxdb_connector_send_query_error, - mode := QueryMode + ?snk_kind := influxdb_connector_send_query_error }), _Timeout1 = 10_000 ), diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl index 72fc11a67..364821ea0 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl @@ -94,7 +94,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected - ?assertMatch(ok, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -116,7 +116,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch(ok, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),