feat(buffer_worker): decouple query mode from underlying connector call mode

Fixes https://emqx.atlassian.net/browse/EMQX-9129

Currently, if an user configures a bridge with query mode sync, then
all calls to the underlying driver/connector ("inner calls") will
always be synchronous, regardless of its support for async calls.

Since buffer workers always support async queries ("outer calls"), we
should decouple those two call modes (inner and outer), and avoid
exposing the inner call configuration to user to avoid complexity.

There are two situations when we want to force synchronous calls to
the underlying connector even if it supports async:

1) When using `simple_sync_query`, since we are bypassing the buffer
workers;
2) When retrying the inflight window, to avoid overwhelming the
driver.
This commit is contained in:
Thales Macedo Garitezi 2023-03-22 11:26:15 -03:00
parent 9049a225b7
commit f8d5d53908
9 changed files with 162 additions and 101 deletions

View File

@ -265,7 +265,7 @@ query(ResId, Request, Opts) ->
IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of
{true, _} ->
%% only Kafka so far
%% only Kafka producer so far
Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
{false, sync} ->

View File

@ -140,7 +140,7 @@ simple_sync_query(Id, Request) ->
QueryOpts = simple_query_opts(),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.
@ -152,7 +152,7 @@ simple_async_query(Id, Request, QueryOpts0) ->
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_request_ref(),
Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
_ = handle_query_result(Id, Result, _HasBeenSent = false),
Result.
@ -377,7 +377,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
} = Data0,
?tp(buffer_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{simple_query => false},
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult =
case QueryOrBatch of
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
@ -566,7 +566,7 @@ do_flush(
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
case reply_caller(Id, Reply, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop
@ -651,7 +651,7 @@ do_flush(#{queue := Q1} = Data0, #{
inflight_tid := InflightTID
} = Data0,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
Result = call_query(async_if_possible, Id, Index, Ref, Batch, QueryOpts),
case batch_reply_caller(Id, Result, Batch, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table.
@ -883,17 +883,13 @@ handle_async_worker_down(Data0, Pid) ->
mark_inflight_items_as_retriable(Data, WorkerMRef),
{keep_state, Data}.
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
case emqx_resource_manager:lookup_cached(Id) of
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, Resource} ->
QM =
case QM0 =:= configured of
true -> maps:get(query_mode, Resource);
false -> QM0
end,
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
@ -1511,9 +1507,9 @@ inc_sent_success(Id, _HasBeenSent = true) ->
inc_sent_success(Id, _HasBeenSent) ->
emqx_resource_metrics:success_inc(Id).
call_mode(sync, _) -> sync;
call_mode(async, always_sync) -> sync;
call_mode(async, async_if_possible) -> async.
call_mode(force_sync, _) -> sync;
call_mode(async_if_possible, always_sync) -> sync;
call_mode(async_if_possible, async_if_possible) -> async.
assert_ok_result(ok) ->
true;

View File

@ -146,6 +146,12 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
{error, timeout}
end.
on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
Pid ! {block, ReplyFun},
{ok, Pid};
on_query_async(_InstId, resume, ReplyFun, #{pid := Pid}) ->
Pid ! {resume, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Pid ! {inc, N, ReplyFun},
{ok, Pid};
@ -274,6 +280,10 @@ counter_loop(
block ->
ct:pal("counter recv: ~p", [block]),
State#{status => blocked};
{block, ReplyFun} ->
ct:pal("counter recv: ~p", [block]),
apply_reply(ReplyFun, ok),
State#{status => blocked};
{block_now, ReplyFun} ->
ct:pal("counter recv: ~p", [block_now]),
apply_reply(
@ -284,6 +294,11 @@ counter_loop(
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
State#{status => running};
{resume, ReplyFun} ->
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
apply_reply(ReplyFun, ok),
State#{status => running};
{inc, N, ReplyFun} when Status == running ->
%ct:pal("async counter recv: ~p", [{inc, N}]),
apply_reply(ReplyFun, ok),

View File

@ -2561,6 +2561,84 @@ do_t_recursive_flush() ->
),
ok.
t_call_mode_uncoupled_from_query_mode(_Config) ->
DefaultOpts = #{
batch_size => 1,
batch_time => 5,
worker_pool_size => 1
},
?check_trace(
begin
%% We check that we can call the buffer workers with async
%% calls, even if the underlying connector itself only
%% supports sync calls.
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{query_mode => async}
),
?tp_span(
async_query_sync_driver,
#{},
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 1}),
#{?snk_kind := buffer_worker_flush_ack},
500
)
)
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
%% And we check the converse: a connector that allows async
%% calls can be called synchronously, but the underlying
%% call should be async.
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
DefaultOpts#{query_mode => sync}
),
?tp_span(
sync_query_async_driver,
#{},
?assertEqual(ok, emqx_resource:query(?ID, {inc_counter, 2}))
),
?assertEqual(ok, emqx_resource:remove_local(?ID)),
?tp(sync_query_async_driver, #{}),
ok
end,
fun(Trace0) ->
Trace1 = trace_between_span(Trace0, async_query_sync_driver),
ct:pal("async query calling sync driver\n ~p", [Trace1]),
?assert(
?strict_causality(
#{?snk_kind := async_query, request := {inc_counter, 1}},
#{?snk_kind := call_query, call_mode := sync},
Trace1
)
),
Trace2 = trace_between_span(Trace0, sync_query_async_driver),
ct:pal("sync query calling async driver\n ~p", [Trace2]),
?assert(
?strict_causality(
#{?snk_kind := sync_query, request := {inc_counter, 2}},
#{?snk_kind := call_query_async},
Trace2
)
),
ok
end
).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
@ -2742,3 +2820,8 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
)
),
ok.
trace_between_span(Trace0, Marker) ->
{Trace1, [_ | _]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := {complete, _}}, Trace0),
{[_ | _], [_ | Trace2]} = ?split_trace_at(#{?snk_kind := Marker, ?snk_span := start}, Trace1),
Trace2.

View File

@ -0,0 +1,7 @@
Decouple the query mode from the underlying call mode for buffer
workers.
Prior to this change, setting the query mode of a resource
such as a bridge to `sync` would force the buffer to call the
underlying connector in a synchronous way, even if it supports async
calls.

View File

@ -291,7 +291,7 @@ t_setup_via_config_and_publish(Config) ->
end,
fun(Trace0) ->
Trace = ?of_kind(dynamo_connector_query_return, Trace0),
?assertMatch([#{result := {ok, _}}], Trace),
?assertMatch([#{result := ok}], Trace),
ok
end
),
@ -328,7 +328,7 @@ t_setup_via_http_api_and_publish(Config) ->
end,
fun(Trace0) ->
Trace = ?of_kind(dynamo_connector_query_return, Trace0),
?assertMatch([#{result := {ok, _}}], Trace),
?assertMatch([#{result := ok}], Trace),
ok
end
),

View File

@ -1013,7 +1013,6 @@ t_publish_timeout(Config) ->
do_econnrefused_or_timeout_test(Config, timeout).
do_econnrefused_or_timeout_test(Config, Error) ->
QueryMode = ?config(query_mode, Config),
ResourceId = ?config(resource_id, Config),
TelemetryTable = ?config(telemetry_table, Config),
Topic = <<"t/topic">>,
@ -1021,15 +1020,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
Message = emqx_message:make(Topic, Payload),
?check_trace(
begin
case {QueryMode, Error} of
{sync, _} ->
{_, {ok, _}} =
?wait_async_action(
emqx:publish(Message),
#{?snk_kind := gcp_pubsub_request_failed, recoverable_error := true},
15_000
);
{async, econnrefused} ->
case Error of
econnrefused ->
%% at the time of writing, async requests
%% are never considered expired by ehttpc
%% (even if they arrive late, or never
@ -1049,7 +1041,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
},
15_000
);
{async, timeout} ->
timeout ->
%% at the time of writing, async requests
%% are never considered expired by ehttpc
%% (even if they arrive late, or never
@ -1067,18 +1059,13 @@ do_econnrefused_or_timeout_test(Config, Error) ->
end
end,
fun(Trace) ->
case {QueryMode, Error} of
{sync, _} ->
case Error of
econnrefused ->
?assertMatch(
[#{reason := Error, connector := ResourceId} | _],
?of_kind(gcp_pubsub_request_failed, Trace)
);
{async, econnrefused} ->
?assertMatch(
[#{reason := Error, connector := ResourceId} | _],
?of_kind(gcp_pubsub_request_failed, Trace)
);
{async, timeout} ->
timeout ->
?assertMatch(
[_, _ | _],
?of_kind(gcp_pubsub_response, Trace)
@ -1088,11 +1075,11 @@ do_econnrefused_or_timeout_test(Config, Error) ->
end
),
case {Error, QueryMode} of
case Error of
%% apparently, async with disabled queue doesn't mark the
%% message as dropped; and since it never considers the
%% response expired, this succeeds.
{econnrefused, async} ->
econnrefused ->
wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
timeout => 10_000, n_events => 1
}),
@ -1114,7 +1101,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
} when Matched >= 1 andalso Inflight + Queueing + Dropped + Failed =< 2,
CurrentMetrics
);
{timeout, async} ->
timeout ->
wait_until_gauge_is(inflight, 0, _Timeout = 400),
wait_until_gauge_is(queuing, 0, _Timeout = 400),
assert_metrics(
@ -1129,21 +1116,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
late_reply => 2
},
ResourceId
);
{_, sync} ->
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 1, 500),
assert_metrics(
#{
dropped => 0,
failed => 0,
inflight => 1,
matched => 1,
queuing => 0,
retried => 0,
success => 0
},
ResourceId
)
end,
@ -1267,7 +1239,6 @@ t_failure_no_body(Config) ->
t_unrecoverable_error(Config) ->
ResourceId = ?config(resource_id, Config),
QueryMode = ?config(query_mode, Config),
TestPid = self(),
FailureNoBodyHandler =
fun(Req0, State) ->
@ -1298,33 +1269,16 @@ t_unrecoverable_error(Config) ->
Message = emqx_message:make(Topic, Payload),
?check_trace(
{_, {ok, _}} =
case QueryMode of
sync ->
?wait_async_action(
emqx:publish(Message),
#{?snk_kind := gcp_pubsub_request_failed},
5_000
);
async ->
?wait_async_action(
emqx:publish(Message),
#{?snk_kind := gcp_pubsub_response},
5_000
)
end,
?wait_async_action(
emqx:publish(Message),
#{?snk_kind := gcp_pubsub_response},
5_000
),
fun(Trace) ->
case QueryMode of
sync ->
?assertMatch(
[#{reason := killed}],
?of_kind(gcp_pubsub_request_failed, Trace)
);
async ->
?assertMatch(
[#{response := {error, killed}}],
?of_kind(gcp_pubsub_response, Trace)
)
end,
?assertMatch(
[#{response := {error, killed}}],
?of_kind(gcp_pubsub_response, Trace)
),
ok
end
),

View File

@ -532,10 +532,12 @@ t_start_ok(Config) ->
},
?check_trace(
begin
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
async -> ct:sleep(500);
sync -> ok
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{
@ -689,10 +691,12 @@ t_const_timestamp(Config) ->
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
async -> ct:sleep(500);
sync -> ok
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{foo => <<"123">>},
@ -745,7 +749,12 @@ t_boolean_variants(Config) ->
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData));
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
@ -841,10 +850,9 @@ t_bad_timestamp(Config) ->
);
{sync, false} ->
?assertEqual(
{error,
{unrecoverable_error, [
{error, {bad_timestamp, <<"bad_timestamp">>}}
]}},
{error, [
{error, {bad_timestamp, <<"bad_timestamp">>}}
]},
Return
);
{sync, true} ->
@ -964,7 +972,7 @@ t_write_failure(Config) ->
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
),
#{?snk_kind := buffer_worker_flush_nack},
#{?snk_kind := handle_async_reply, action := nack},
1_000
);
async ->
@ -978,13 +986,13 @@ t_write_failure(Config) ->
fun(Trace0) ->
case QueryMode of
sync ->
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse
{error, {error, closed}} =:= Result orelse
{error, {recoverable_error, {error, econnrefused}}} =:= Result,
{error, {recoverable_error, econnrefused}} =:= Result,
#{got => Result}
);
async ->
@ -1006,7 +1014,6 @@ t_write_failure(Config) ->
ok.
t_missing_field(Config) ->
QueryMode = ?config(query_mode, Config),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
{ok, _} =
@ -1034,8 +1041,7 @@ t_missing_field(Config) ->
{ok, _} =
snabbkaffe:block_until(
?match_n_events(NEvents, #{
?snk_kind := influxdb_connector_send_query_error,
mode := QueryMode
?snk_kind := influxdb_connector_send_query_error
}),
_Timeout1 = 10_000
),

View File

@ -94,7 +94,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
@ -116,7 +116,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),