refactor(buffer_worker): rename trace points

This commit is contained in:
Zaiming (Stone) Shi 2023-01-27 17:27:27 +01:00
parent fc38ea9571
commit d47941601d
4 changed files with 22 additions and 29 deletions

View File

@ -960,13 +960,13 @@ handle_async_reply(
resource_id := Id, resource_id := Id,
worker_index := Index, worker_index := Index,
buffer_worker := Pid, buffer_worker := Pid,
query := ?QUERY(_, _, _, ExpireAt) = Query query := ?QUERY(_, _, _, ExpireAt) = _Query
} = ReplyContext, } = ReplyContext,
Result Result
) -> ) ->
?tp( ?tp(
buffer_worker_reply_after_query_enter, handle_async_reply_enter,
#{batch_or_query => [Query], ref => Ref} #{batch_or_query => [_Query], ref => Ref}
), ),
Now = now_(), Now = now_(),
case is_expired(ExpireAt, Now) of case is_expired(ExpireAt, Now) of
@ -975,7 +975,7 @@ handle_async_reply(
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid), IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), ?tp(handle_async_reply_expired, #{expired => [_Query]}),
ok; ok;
false -> false ->
do_handle_async_reply(ReplyContext, Result) do_handle_async_reply(ReplyContext, Result)
@ -989,7 +989,7 @@ do_handle_async_reply(
worker_index := Index, worker_index := Index,
buffer_worker := Pid, buffer_worker := Pid,
inflight_tid := InflightTID, inflight_tid := InflightTID,
query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
}, },
Result Result
) -> ) ->
@ -1000,9 +1000,9 @@ do_handle_async_reply(
Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
), ),
?tp(buffer_worker_reply_after_query, #{ ?tp(handle_async_reply, #{
action => Action, action => Action,
batch_or_query => [Query], batch_or_query => [_Query],
ref => Ref, ref => Ref,
result => Result result => Result
}), }),
@ -1028,7 +1028,7 @@ handle_async_batch_reply(
Result Result
) -> ) ->
?tp( ?tp(
buffer_worker_reply_after_query_enter, handle_async_reply_enter,
#{batch_or_query => Batch, ref => Ref} #{batch_or_query => Batch, ref => Ref}
), ),
Now = now_(), Now = now_(),
@ -1038,13 +1038,13 @@ handle_async_batch_reply(
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid), IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}), ?tp(handle_async_reply_expired, #{expired => Batch}),
ok; ok;
{NotExpired, Expired} -> {NotExpired, Expired} ->
NumExpired = length(Expired), NumExpired = length(Expired),
emqx_resource_metrics:late_reply_inc(Id, NumExpired), emqx_resource_metrics:late_reply_inc(Id, NumExpired),
NumExpired > 0 andalso NumExpired > 0 andalso
?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), ?tp(handle_async_reply_expired, #{expired => Expired}),
do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result)
end. end.
@ -1060,15 +1060,8 @@ do_handle_async_batch_reply(
}, },
Result Result
) -> ) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => Batch, ref => Ref}
),
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
?tp(buffer_worker_reply_after_query, #{ ?tp(handle_async_reply, #{
action => Action, action => Action,
batch_or_query => Batch, batch_or_query => Batch,
ref => Ref, ref => Ref,

View File

@ -1718,7 +1718,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
async -> async ->
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{ #{
?snk_kind := buffer_worker_reply_after_query, ?snk_kind := handle_async_reply,
action := ack, action := ack,
batch_or_query := [{query, _, {inc_counter, 99}, _, _}] batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
}, },
@ -1849,7 +1849,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
?force_ordering( ?force_ordering(
#{?snk_kind := delay}, #{?snk_kind := delay},
#{ #{
?snk_kind := buffer_worker_reply_after_query_enter, ?snk_kind := handle_async_reply_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
} }
), ),
@ -1874,7 +1874,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
#{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
), ),
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
), ),
unlink(Pid0), unlink(Pid0),
@ -1888,7 +1888,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
expired := [{query, _, {inc_counter, 199}, _, _}] expired := [{query, _, {inc_counter, 199}, _, _}]
} }
], ],
?of_kind(buffer_worker_reply_after_query_expired, Trace) ?of_kind(handle_async_reply_expired, Trace)
), ),
wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
Metrics = tap_metrics(?LINE), Metrics = tap_metrics(?LINE),
@ -1936,7 +1936,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
?force_ordering( ?force_ordering(
#{?snk_kind := delay}, #{?snk_kind := delay},
#{ #{
?snk_kind := buffer_worker_reply_after_query_enter, ?snk_kind := handle_async_reply_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
} }
), ),
@ -1955,7 +1955,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
end), end),
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS
), ),
unlink(Pid0), unlink(Pid0),
@ -1969,7 +1969,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
expired := [{query, _, {inc_counter, 199}, _, _}] expired := [{query, _, {inc_counter, 199}, _, _}]
} }
], ],
?of_kind(buffer_worker_reply_after_query_expired, Trace) ?of_kind(handle_async_reply_expired, Trace)
), ),
Metrics = tap_metrics(?LINE), Metrics = tap_metrics(?LINE),
?assertMatch( ?assertMatch(
@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]), ct:pal(" ~p", [Trace]),
?assert( ?assert(
?strict_causality( ?strict_causality(
#{?snk_kind := buffer_worker_reply_after_query, action := nack}, #{?snk_kind := handle_async_reply, action := nack},
#{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref},
Trace Trace
) )

View File

@ -227,7 +227,7 @@ render_timestamp(Template, Message) ->
%% Wolff producer never gives up retrying %% Wolff producer never gives up retrying
%% so there can only be 'ok' results. %% so there can only be 'ok' results.
on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
%% the ReplyFn is emqx_resource_worker:reply_after_query/8 %% the ReplyFn is emqx_resource_worker:handle_async_reply/2
apply(ReplyFn, Args ++ [ok]); apply(ReplyFn, Args ++ [ok]);
on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% wolff should bump the dropped_queue_full counter %% wolff should bump the dropped_queue_full counter

View File

@ -920,7 +920,7 @@ t_write_failure(Config) ->
async -> async ->
?wait_async_action( ?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)), ?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := buffer_worker_reply_after_query}, #{?snk_kind := handle_async_reply},
1_000 1_000
) )
end end
@ -938,7 +938,7 @@ t_write_failure(Config) ->
#{got => Result} #{got => Result}
); );
async -> async ->
Trace = ?of_kind(buffer_worker_reply_after_query, Trace0), Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([#{action := nack} | _], Trace), ?assertMatch([#{action := nack} | _], Trace),
[#{result := Result} | _] = Trace, [#{result := Result} | _] = Trace,
?assert( ?assert(