fix(resource): make option 'queue_enabled' disabled by default

This commit is contained in:
Shawn 2022-07-29 23:26:54 +08:00
parent 0377d3cf61
commit d3950b9534
5 changed files with 17 additions and 20 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authn, [ {application, emqx_authn, [
{description, "EMQX Authentication"}, {description, "EMQX Authentication"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{modules, []}, {modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]}, {registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]}, {applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authz, [ {application, emqx_authz, [
{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{mod, {emqx_authz_app, []}}, {mod, {emqx_authz_app, []}},
{applications, [ {applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [

View File

@ -73,7 +73,7 @@
-callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
{{from(), result()}, NewCbState :: term()}. {{from(), result()}, NewCbState :: term()}.
callback_mode() -> [state_functions, state_enter]. callback_mode() -> [state_functions].
start_link(Id, Index, Opts) -> start_link(Id, Index, Opts) ->
gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
@ -107,7 +107,7 @@ init({Id, Index, Opts}) ->
true = gproc_pool:connect_worker(Id, {Id, Index}), true = gproc_pool:connect_worker(Id, {Id, Index}),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
Queue = Queue =
case maps:get(queue_enabled, Opts, true) of case maps:get(queue_enabled, Opts, false) of
true -> true ->
replayq:open(#{ replayq:open(#{
dir => disk_queue_dir(Id, Index), dir => disk_queue_dir(Id, Index),
@ -131,8 +131,6 @@ init({Id, Index, Opts}) ->
}, },
{ok, blocked, St, {next_event, cast, resume}}. {ok, blocked, St, {next_event, cast, resume}}.
running(enter, _, _St) ->
keep_state_and_data;
running(cast, resume, _St) -> running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, block, St) -> running(cast, block, St) ->
@ -149,8 +147,6 @@ running(info, Info, _St) ->
?SLOG(error, #{msg => unexpected_msg, info => Info}), ?SLOG(error, #{msg => unexpected_msg, info => Info}),
keep_state_and_data. keep_state_and_data.
blocked(enter, _, _St) ->
keep_state_and_data;
blocked(cast, block, _St) -> blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
blocked(cast, resume, St) -> blocked(cast, resume, St) ->
@ -218,28 +214,25 @@ drop_head(Q) ->
ok = replayq:ack(Q1, AckRef), ok = replayq:ack(Q1, AckRef),
Q1. Q1.
query_or_acc(From, Request, #{batch_enabled := true} = St) -> query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) ->
acc_query(From, Request, St);
query_or_acc(From, Request, #{batch_enabled := false} = St) ->
send_query(From, Request, St).
acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
Acc1 = [?QUERY(From, Request) | Acc], Acc1 = [?QUERY(From, Request) | Acc],
St = St0#{acc := Acc1, acc_left := Left - 1}, St = St0#{acc := Acc1, acc_left := Left - 1},
case Left =< 1 of case Left =< 1 of
true -> flush(St); true -> flush(St);
false -> {keep_state, ensure_flush_timer(St)} false -> {keep_state, ensure_flush_timer(St)}
end. end;
query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id} = St) ->
send_query(From, Request, #{id := Id, queue := Q} = St) -> case send_query(From, Request, Id) of
Result = call_query(Id, Request),
case reply_caller(Id, ?REPLY(From, Request, Result), false) of
true -> true ->
{next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}; {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}};
false -> false ->
{keep_state, St} {keep_state, St}
end. end.
send_query(From, Request, Id) ->
Result = call_query(Id, Request),
reply_caller(Id, ?REPLY(From, Request, Result), false).
flush(#{acc := []} = St) -> flush(#{acc := []} = St) ->
{keep_state, St}; {keep_state, St};
flush( flush(

View File

@ -347,6 +347,10 @@ t_create_dry_run_local(_) ->
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}). [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}).
create_dry_run_local_succ() -> create_dry_run_local_succ() ->
case whereis(test_resource) of
undefined -> ok;
Pid -> exit(Pid, kill)
end,
?assertEqual( ?assertEqual(
ok, ok,
emqx_resource:create_dry_run_local( emqx_resource:create_dry_run_local(