From 4ed7bff33f9c27fe91a6924302cf7ad358d302fc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 17 Jan 2023 14:50:04 -0300 Subject: [PATCH] chore: fix dialyzer warnings --- apps/emqx_resource/src/emqx_resource_worker.erl | 11 ++++++++--- apps/emqx_resource/src/emqx_resource_worker_sup.erl | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 2ef1cbed4..51a95424a 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -80,6 +80,7 @@ -type from() :: pid() | reply_fun() | request_from(). -type request_from() :: undefined | gen_statem:from(). -type state() :: blocked | running. +-type inflight_key() :: integer(). -type data() :: #{ id := id(), index := index(), @@ -248,7 +249,7 @@ blocked(info, Info, _Data) -> keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> - replayq:close(Q), + _ = replayq:close(Q), emqx_resource_metrics:inflight_set(Id, Index, 0), %% since we want volatile queues, this will be 0 after %% termination. @@ -376,7 +377,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> end. %% Called during the `running' state only. --spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data(). +-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> + gen_statem:event_handler_result(state(), data()). handle_query_requests(Request0, Data0) -> {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), maybe_flush(Data). @@ -454,7 +456,9 @@ flush(Data0) -> -spec do_flush(data(), #{ is_batch := boolean(), batch := [?QUERY(from(), request(), boolean())], - ack_ref := replayq:ack_ref() + ack_ref := replayq:ack_ref(), + ref := inflight_key(), + new_queue := replayq:q() }) -> gen_statem:event_handler_result(state(), data()). do_flush( @@ -1176,6 +1180,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. +-spec make_message_ref() -> inflight_key(). make_message_ref() -> erlang:monotonic_time(nanosecond). diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl index a5de76eca..8b0ce2c65 100644 --- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl @@ -67,7 +67,7 @@ stop_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), lists:foreach( fun(Idx) -> - ensure_worker_removed(ResId, Idx), + _ = ensure_worker_removed(ResId, Idx), ensure_disk_queue_dir_absent(ResId, Idx) end, lists:seq(1, WorkerPoolSize)