chore: fix dialyzer warnings

This commit is contained in:
Thales Macedo Garitezi 2023-01-17 14:50:04 -03:00
parent fa01deb3eb
commit 4ed7bff33f
2 changed files with 9 additions and 4 deletions

View File

@ -80,6 +80,7 @@
-type from() :: pid() | reply_fun() | request_from(). -type from() :: pid() | reply_fun() | request_from().
-type request_from() :: undefined | gen_statem:from(). -type request_from() :: undefined | gen_statem:from().
-type state() :: blocked | running. -type state() :: blocked | running.
-type inflight_key() :: integer().
-type data() :: #{ -type data() :: #{
id := id(), id := id(),
index := index(), index := index(),
@ -248,7 +249,7 @@ blocked(info, Info, _Data) ->
keep_state_and_data. keep_state_and_data.
terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
replayq:close(Q), _ = replayq:close(Q),
emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0),
%% since we want volatile queues, this will be 0 after %% since we want volatile queues, this will be 0 after
%% termination. %% termination.
@ -376,7 +377,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
end. end.
%% Called during the `running' state only. %% Called during the `running' state only.
-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data(). -spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
gen_statem:event_handler_result(state(), data()).
handle_query_requests(Request0, Data0) -> handle_query_requests(Request0, Data0) ->
{_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
maybe_flush(Data). maybe_flush(Data).
@ -454,7 +456,9 @@ flush(Data0) ->
-spec do_flush(data(), #{ -spec do_flush(data(), #{
is_batch := boolean(), is_batch := boolean(),
batch := [?QUERY(from(), request(), boolean())], batch := [?QUERY(from(), request(), boolean())],
ack_ref := replayq:ack_ref() ack_ref := replayq:ack_ref(),
ref := inflight_key(),
new_queue := replayq:q()
}) -> }) ->
gen_statem:event_handler_result(state(), data()). gen_statem:event_handler_result(state(), data()).
do_flush( do_flush(
@ -1176,6 +1180,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
_ = erlang:cancel_timer(TRef), _ = erlang:cancel_timer(TRef),
St#{tref => undefined}. St#{tref => undefined}.
-spec make_message_ref() -> inflight_key().
make_message_ref() -> make_message_ref() ->
erlang:monotonic_time(nanosecond). erlang:monotonic_time(nanosecond).

View File

@ -67,7 +67,7 @@ stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
ensure_worker_removed(ResId, Idx), _ = ensure_worker_removed(ResId, Idx),
ensure_disk_queue_dir_absent(ResId, Idx) ensure_disk_queue_dir_absent(ResId, Idx)
end, end,
lists:seq(1, WorkerPoolSize) lists:seq(1, WorkerPoolSize)