fix: incorrect message order when batch is enabled

This commit is contained in:
Shawn 2022-09-01 14:51:13 +08:00
parent 0ef0b68de4
commit 33c9c7d497
1 changed files with 23 additions and 6 deletions

View File

@ -303,11 +303,12 @@ flush(#{acc := []} = St) ->
flush( flush(
#{ #{
id := Id, id := Id,
acc := Batch, acc := Batch0,
batch_size := Size, batch_size := Size,
queue := Q0 queue := Q0
} = St } = St
) -> ) ->
Batch = lists:reverse(Batch0),
QueryOpts = #{ QueryOpts = #{
inflight_name => maps:get(name, St), inflight_name => maps:get(name, St),
inflight_window => maps:get(async_inflight_window, St) inflight_window => maps:get(async_inflight_window, St)
@ -393,12 +394,12 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
BlockWorker; BlockWorker;
handle_query_result(Id, {error, _}, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
BlockWorker;
handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
true; true;
handle_query_result(Id, {error, _}, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
BlockWorker;
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
true; true;
handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
@ -449,7 +450,15 @@ call_query(QM0, Id, Query, QueryOpts) ->
apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); ?APPLY_RESOURCE(
begin
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
Result = Mod:on_query(Id, Request, ResSt),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
Result
end,
Request
);
apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),
@ -475,7 +484,15 @@ apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Requests = [Request || ?QUERY(_From, Request) <- Batch], Requests = [Request || ?QUERY(_From, Request) <- Batch],
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(
begin
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
Result = Mod:on_batch_query(Id, Requests, ResSt),
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
Result
end,
Batch
);
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), Name = maps:get(inflight_name, QueryOpts, undefined),