From d3950b9534ae9e220ad7e6a550f3f549ca4baf19 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 29 Jul 2022 23:26:54 +0800 Subject: [PATCH] fix(resource): make option 'queue_enabled' disabled by default --- apps/emqx_authn/src/emqx_authn.app.src | 2 +- apps/emqx_authz/src/emqx_authz.app.src | 2 +- .../emqx_connector/src/emqx_connector.app.src | 2 +- .../src/emqx_resource_worker.erl | 27 +++++++------------ .../test/emqx_resource_SUITE.erl | 4 +++ 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn.app.src b/apps/emqx_authn/src/emqx_authn.app.src index 8087e822f..ef67b9a14 100644 --- a/apps/emqx_authn/src/emqx_authn.app.src +++ b/apps/emqx_authn/src/emqx_authn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authn, [ {description, "EMQX Authentication"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {modules, []}, {registered, [emqx_authn_sup, emqx_authn_registry]}, {applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]}, diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index ed19b15a8..e40b5e64c 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 007962da3..cce266966 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "An OTP application"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 9ab7fb749..055fbfc53 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -73,7 +73,7 @@ -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> {{from(), result()}, NewCbState :: term()}. -callback_mode() -> [state_functions, state_enter]. +callback_mode() -> [state_functions]. start_link(Id, Index, Opts) -> gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). @@ -107,7 +107,7 @@ init({Id, Index, Opts}) -> true = gproc_pool:connect_worker(Id, {Id, Index}), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = - case maps:get(queue_enabled, Opts, true) of + case maps:get(queue_enabled, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), @@ -131,8 +131,6 @@ init({Id, Index, Opts}) -> }, {ok, blocked, St, {next_event, cast, resume}}. -running(enter, _, _St) -> - keep_state_and_data; running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> @@ -149,8 +147,6 @@ running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, info => Info}), keep_state_and_data. -blocked(enter, _, _St) -> - keep_state_and_data; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> @@ -218,28 +214,25 @@ drop_head(Q) -> ok = replayq:ack(Q1, AckRef), Q1. -query_or_acc(From, Request, #{batch_enabled := true} = St) -> - acc_query(From, Request, St); -query_or_acc(From, Request, #{batch_enabled := false} = St) -> - send_query(From, Request, St). - -acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) -> +query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} - end. - -send_query(From, Request, #{id := Id, queue := Q} = St) -> - Result = call_query(Id, Request), - case reply_caller(Id, ?REPLY(From, Request, Result), false) of + end; +query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id} = St) -> + case send_query(From, Request, Id) of true -> {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}; false -> {keep_state, St} end. +send_query(From, Request, Id) -> + Result = call_query(Id, Request), + reply_caller(Id, ?REPLY(From, Request, Result), false). + flush(#{acc := []} = St) -> {keep_state, St}; flush( diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 915c59611..278f556ef 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -347,6 +347,10 @@ t_create_dry_run_local(_) -> [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}). create_dry_run_local_succ() -> + case whereis(test_resource) of + undefined -> ok; + Pid -> exit(Pid, kill) + end, ?assertEqual( ok, emqx_resource:create_dry_run_local(