refactor: remove redundant `BlockWorker` arg, change boolean to ack/nack

`BlockWorker` was always false (ack).  Also, changed the return to
something more semantic than a boolean to avoid [boolean
blindness](https://runtimeverification.com/blog/code-smell-boolean-blindness/)
This commit is contained in:
Thales Macedo Garitezi 2023-01-13 15:58:45 -03:00
parent 478fcc6ffd
commit bd95a95409
1 changed files with 46 additions and 53 deletions

View File

@ -117,8 +117,7 @@ simple_sync_query(Id, Request) ->
Ref = make_message_ref(), Ref = make_message_ref(),
Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
HasBeenSent = false, HasBeenSent = false,
BlockWorker = false, _ = handle_query_result(Id, Result, HasBeenSent),
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
Result. Result.
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
@ -135,8 +134,7 @@ simple_async_query(Id, Request, ReplyFun) ->
Ref = make_message_ref(), Ref = make_message_ref(),
Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts), Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts),
HasBeenSent = false, HasBeenSent = false,
BlockWorker = false, _ = handle_query_result(Id, Result, HasBeenSent),
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
Result. Result.
-spec block(pid()) -> ok. -spec block(pid()) -> ok.
@ -304,15 +302,14 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% if we are retrying an inflight query, it has been sent %% if we are retrying an inflight query, it has been sent
HasBeenSent = true, HasBeenSent = true,
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
BlockWorker = false, case handle_query_result_pure(Id, Result, HasBeenSent) of
case handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) of
%% Send failed because resource is down %% Send failed because resource is down
{true, PostFn} -> {nack, PostFn} ->
PostFn(), PostFn(),
?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), ?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}),
{keep_state, Data0, {state_timeout, ResumeT, unblock}}; {keep_state, Data0, {state_timeout, ResumeT, unblock}};
%% Send ok or failed but the resource is working %% Send ok or failed but the resource is working
{false, PostFn} -> {ack, PostFn} ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
%% we need to defer bumping the counters after %% we need to defer bumping the counters after
%% `inflight_drop' to avoid the race condition when an %% `inflight_drop' to avoid the race condition when an
@ -427,7 +424,7 @@ do_flush(
%% from it again. But we must ensure it's in the inflight %% from it again. But we must ensure it's in the inflight
%% table, even if it's full, so we don't lose the request. %% table, even if it's full, so we don't lose the request.
%% And only in that case. %% And only in that case.
true -> nack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
ShouldPreserveInInflight = ShouldPreserveInInflight =
is_inflight_full_result(Result) orelse is_inflight_full_result(Result) orelse
@ -436,7 +433,7 @@ do_flush(
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{next_state, blocked, Data0}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
false -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -467,7 +464,7 @@ do_flush(Data0, #{
%% from it again. But we must ensure it's in the inflight %% from it again. But we must ensure it's in the inflight
%% table, even if it's full, so we don't lose the request. %% table, even if it's full, so we don't lose the request.
%% And only in that case. %% And only in that case.
true -> nack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
ShouldPreserveInInflight = ShouldPreserveInInflight =
is_inflight_full_result(Result) orelse is_inflight_full_result(Result) orelse
@ -476,7 +473,7 @@ do_flush(Data0, #{
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
{next_state, blocked, Data0}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
false -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -499,11 +496,11 @@ batch_reply_caller(Id, BatchResult, Batch) ->
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
lists:foldl( lists:foldl(
fun(Reply, {BlockWorker, PostFns}) -> fun(Reply, {_ShouldBlock, PostFns}) ->
{ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply, BlockWorker), {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
{ShouldBlock, [PostFn | PostFns]} {ShouldBlock, [PostFn | PostFns]}
end, end,
{false, []}, {ack, []},
%% the `Mod:on_batch_query/3` returns a single result for a batch, %% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand %% so we need to expand
?EXPAND(BatchResult, Batch) ?EXPAND(BatchResult, Batch)
@ -514,13 +511,9 @@ reply_caller(Id, Reply) ->
PostFn(), PostFn(),
ShouldBlock. ShouldBlock.
reply_caller_defer_metrics(Id, Reply) -> reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) ->
BlockWorker = false, handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, Reply, BlockWorker). reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
is_function(ReplyFun) is_function(ReplyFun)
-> ->
_ = _ =
@ -528,49 +521,49 @@ reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result),
{async_return, _} -> no_reply_for_now; {async_return, _} -> no_reply_for_now;
_ -> apply(ReplyFun, Args ++ [Result]) _ -> apply(ReplyFun, Args ++ [Result])
end, end,
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker); handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) ->
gen_statem:reply(From, Result), gen_statem:reply(From, Result),
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker). handle_query_result_pure(Id, Result, HasBeenSent).
handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> handle_query_result(Id, Result, HasBeenSent) ->
{ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker), {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
PostFn(), PostFn(),
ShouldBlock. ShouldBlock.
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{msg => resource_exception, info => Msg}), ?SLOG(error, #{msg => resource_exception, info => Msg}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
-> ->
{true, fun() -> ok end}; {nack, fun() -> ok end};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
emqx_resource_metrics:dropped_resource_not_found_inc(Id), emqx_resource_metrics:dropped_resource_not_found_inc(Id),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
emqx_resource_metrics:dropped_resource_stopped_inc(Id), emqx_resource_metrics:dropped_resource_stopped_inc(Id),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_resource_metrics:dropped_other_inc(Id), emqx_resource_metrics:dropped_other_inc(Id),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
%% the message will be queued in replayq or inflight window, %% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
%% sent this message. %% sent this message.
@ -578,32 +571,32 @@ handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent,
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
ok ok
end, end,
{true, PostFn}; {nack, PostFn};
handle_query_result_pure(Id, {error, Reason}, HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, {error, Reason}, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent) ->
{true, fun() -> ok end}; {nack, fun() -> ok end};
handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
ok ok
end, end,
{BlockWorker, PostFn}; {ack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{BlockWorker, fun() -> ok end}; {ack, fun() -> ok end};
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) -> handle_query_result_pure(Id, Result, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
inc_sent_success(Id, HasBeenSent), inc_sent_success(Id, HasBeenSent),
ok ok
end, end,
{BlockWorker, PostFn}. {ack, PostFn}.
is_inflight_full_result({async_return, inflight_full}) -> is_inflight_full_result({async_return, inflight_full}) ->
true; true;
@ -732,10 +725,10 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
{true, PostFn} -> {nack, PostFn} ->
PostFn(), PostFn(),
?MODULE:block(Pid); ?MODULE:block(Pid);
{false, PostFn} -> {ack, PostFn} ->
IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
ok ok
@ -746,10 +739,10 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
case batch_reply_caller_defer_metrics(Id, Result, Batch) of case batch_reply_caller_defer_metrics(Id, Result, Batch) of
{true, PostFns} -> {nack, PostFns} ->
lists:foreach(fun(F) -> F() end, PostFns), lists:foreach(fun(F) -> F() end, PostFns),
?MODULE:block(Pid); ?MODULE:block(Pid);
{false, PostFns} -> {ack, PostFns} ->
IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index), IsAcked = ack_inflight_and_resume(Pid, InflightTID, Ref, Id, Index),
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
ok ok