diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index fa391151c..59b41c7fa 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -170,8 +170,11 @@ send_message(BridgeId, Message) ->
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
- #{enable := true} ->
- emqx_resource:query(ResId, {send_message, Message});
+ #{enable := true} = Config ->
+ Timeout = emqx_map_lib:deep_get(
+ [resource_opts, request_timeout], Config, timer:seconds(15)
+ ),
+ emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout});
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
end.
diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
index b3c2b8d85..d20d3bc10 100644
--- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
@@ -145,10 +145,12 @@ set_special_configs(_) ->
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+ ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
emqx_common_test_helpers:call_janitor(),
+ snabbkaffe:stop(),
ok.
clear_resources() ->
@@ -478,8 +480,6 @@ t_egress_custom_clientid_prefix(_Config) ->
end,
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
- {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
-
ok.
t_mqtt_conn_bridge_ingress_and_egress(_) ->
@@ -830,6 +830,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
+ <<"request_timeout">> => <<"500ms">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
@@ -880,17 +881,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
ok = emqx_listeners:stop_listener('tcp:default'),
ct:sleep(1500),
- %% PUBLISH 2 messages to the 'local' broker, the message should
- ok = snabbkaffe:start_trace(),
+ %% PUBLISH 2 messages to the 'local' broker, the messages should
+ %% be enqueued and the resource will block
{ok, SRef} =
snabbkaffe:subscribe(
fun
- (
- #{
- ?snk_kind := call_query_enter,
- query := {query, _From, {send_message, #{}}, _Sent}
- }
- ) ->
+ (#{?snk_kind := resource_worker_retry_inflight_failed}) ->
+ true;
+ (#{?snk_kind := resource_worker_flush_nack}) ->
true;
(_) ->
false
@@ -903,7 +901,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
emqx:publish(emqx_message:make(LocalTopic, Payload1)),
emqx:publish(emqx_message:make(LocalTopic, Payload2)),
{ok, _} = snabbkaffe:receive_events(SRef),
- ok = snabbkaffe:stop(),
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
@@ -920,7 +917,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"matched">> := Matched,
<<"success">> := 1,
<<"failed">> := 0,
- <<"queuing">> := 2
+ <<"queuing">> := 1,
+ <<"inflight">> := 1
} when Matched >= 3,
maps:get(<<"metrics">>, DecodedMetrics1)
),
@@ -952,18 +950,17 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
ok.
assert_mqtt_msg_received(Topic, Payload) ->
- ?assert(
- receive
- {deliver, Topic, #message{payload = Payload}} ->
- ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
- true;
- Msg ->
- ct:pal("Unexpected Msg: ~p", [Msg]),
- false
- after 100 ->
- false
- end
- ).
+ ct:pal("checking if ~p has been received on ~p", [Payload, Topic]),
+ receive
+ {deliver, Topic, #message{payload = Payload}} ->
+ ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
+ ok;
+ Msg ->
+ ct:pal("Unexpected Msg: ~p", [Msg]),
+ assert_mqtt_msg_received(Topic, Payload)
+ after 100 ->
+ ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic])
+ end.
request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body).
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index d68f624e4..7f84c665a 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -380,7 +380,8 @@ on_query_async(
NRequest,
Timeout,
{fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
- ).
+ ),
+ {ok, Worker}.
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 585122539..71ed81dda 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -198,7 +198,10 @@ on_query_async(
#{name := InstanceId}
) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
- emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
+ %% this is a cast, currently.
+ ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}),
+ WorkerPid = get_worker_pid(InstanceId),
+ {ok, WorkerPid}.
on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:status(InstanceId) of
@@ -212,6 +215,12 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
{error, Reason} -> {error, Reason}
end.
+%% mqtt workers, when created and called via bridge callbacks, are
+%% registered.
+-spec get_worker_pid(atom()) -> pid().
+get_worker_pid(InstanceId) ->
+ whereis(InstanceId).
+
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined;
make_sub_confs(undefined, _Conf, _) ->
diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl
index c144c48e9..066e053d4 100644
--- a/apps/emqx_connector/src/emqx_connector_mysql.erl
+++ b/apps/emqx_connector/src/emqx_connector_mysql.erl
@@ -192,7 +192,7 @@ on_batch_query(
{Key, _} ->
case maps:get(Key, Inserts, undefined) of
undefined ->
- {error, batch_select_not_implemented};
+ {error, {unrecoverable_error, batch_select_not_implemented}};
InsertSQL ->
Tokens = maps:get(Key, ParamsTokens),
on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
@@ -200,7 +200,7 @@ on_batch_query(
Request ->
LogMeta = #{connector => InstId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
- {error, invalid_request}
+ {error, {unrecoverable_error, invalid_request}}
end.
mysql_function(sql) ->
@@ -267,7 +267,7 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
case maps:is_key(SQLOrKey, Prepares) of
true -> prepare_sql(Prepares, PoolName);
- false -> {error, prepared_statement_invalid}
+ false -> {error, {unrecoverable_error, prepared_statement_invalid}}
end.
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
@@ -465,12 +465,12 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
),
{error, {recoverable_error, Reason}};
- {error, Reason} = Result ->
+ {error, Reason} ->
?SLOG(
error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
),
- Result;
+ {error, {unrecoverable_error, Reason}};
Result ->
?tp(
mysql_connector_query_return,
@@ -483,5 +483,5 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
error,
LogMeta#{msg => "mysql_connector_invalid_params", params => Data}
),
- {error, {invalid_params, Data}}
+ {error, {unrecoverable_error, {invalid_params, Data}}}
end.
diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl
index 9965ff3b4..34defb5e5 100644
--- a/apps/emqx_connector/src/emqx_connector_pgsql.erl
+++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl
@@ -153,7 +153,8 @@ on_query(
}),
Type = pgsql_query_type(TypeOrKey),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
- on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data).
+ Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
+ handle_result(Res).
pgsql_query_type(sql) ->
query;
@@ -182,23 +183,17 @@ on_batch_query(
msg => "batch prepare not implemented"
},
?SLOG(error, Log),
- {error, batch_prepare_not_implemented};
+ {error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
- {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2),
- %% this local function only suits for the result of batch insert
- TransResult = fun
- Trans([{ok, Count} | T], Acc) ->
- Trans(T, Acc + Count);
- Trans([{error, _} = Error | _], _Acc) ->
- Error;
- Trans([], Acc) ->
- {ok, Acc}
- end,
-
- TransResult(Results, 0)
+ case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
+ {error, Error} ->
+ {error, Error};
+ {_Column, Results} ->
+ handle_batch_result(Results, 0)
+ end
end;
_ ->
Log = #{
@@ -208,7 +203,7 @@ on_batch_query(
msg => "invalid request"
},
?SLOG(error, Log),
- {error, invalid_request}
+ {error, {unrecoverable_error, invalid_request}}
end.
proc_sql_params(query, SQLOrKey, Params, _State) ->
@@ -225,24 +220,38 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
end.
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
- Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover),
- case Result of
- {error, Reason} ->
+ try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
+ {error, Reason} = Result ->
+ ?tp(
+ pgsql_connector_query_return,
+ #{error => Reason}
+ ),
?SLOG(error, #{
msg => "postgresql connector do sql query failed",
connector => InstId,
type => Type,
sql => NameOrSQL,
reason => Reason
- });
- _ ->
+ }),
+ Result;
+ Result ->
?tp(
pgsql_connector_query_return,
#{result => Result}
),
- ok
- end,
- Result.
+ Result
+ catch
+ error:function_clause:Stacktrace ->
+ ?SLOG(error, #{
+ msg => "postgresql connector do sql query failed",
+ connector => InstId,
+ type => Type,
+ sql => NameOrSQL,
+ reason => function_clause,
+ stacktrace => Stacktrace
+ }),
+ {error, {unrecoverable_error, invalid_request}}
+ end.
on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
@@ -407,3 +416,15 @@ to_bin(Bin) when is_binary(Bin) ->
Bin;
to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom).
+
+handle_result({error, Error}) ->
+ {error, {unrecoverable_error, Error}};
+handle_result(Res) ->
+ Res.
+
+handle_batch_result([{ok, Count} | Rest], Acc) ->
+ handle_batch_result(Rest, Acc + Count);
+handle_batch_result([{error, Error} | _Rest], _Acc) ->
+ {error, {unrecoverable_error, Error}};
+handle_batch_result([], Acc) ->
+ {ok, Acc}.
diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl
index 7fdf9d28d..4bb46bca3 100644
--- a/apps/emqx_connector/src/emqx_connector_redis.erl
+++ b/apps/emqx_connector/src/emqx_connector_redis.erl
@@ -207,11 +207,23 @@ do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) ->
connector => InstId,
query => Query,
reason => Reason
- });
+ }),
+ case is_unrecoverable_error(Reason) of
+ true ->
+ {error, {unrecoverable_error, Reason}};
+ false ->
+ Result
+ end;
_ ->
- ok
- end,
- Result.
+ Result
+ end.
+
+is_unrecoverable_error(Results) when is_list(Results) ->
+ lists:any(fun is_unrecoverable_error/1, Results);
+is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) ->
+ true;
+is_unrecoverable_error(_) ->
+ false.
extract_eredis_cluster_workers(PoolName) ->
lists:flatten([
diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
index a1d8fe9d5..87d2b8e21 100644
--- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
@@ -128,8 +128,12 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]})
),
?assertMatch(
- {error, [{ok, <<"PONG">>}, {error, _}]},
- emqx_resource:query(PoolName, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]})
+ {error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}},
+ emqx_resource:query(
+ PoolName,
+ {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]},
+ #{timeout => 500}
+ )
),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
index 0b6cbd0a2..de76967ab 100644
--- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
+++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
@@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
}
}
+ request_timeout {
+ desc {
+ en: """Timeout for requests. If query_mode
is sync
, calls to the resource will be blocked for this amount of time before timing out."""
+ zh: """请求的超时。 如果query_mode
是sync
,对资源的调用将在超时前被阻断这一时间。"""
+ }
+ label {
+ en: """Request timeout"""
+ zh: """请求超时"""
+ }
+ }
+
enable_batch {
desc {
en: """Batch mode enabled."""
diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl
index aff66c287..0fd21bfcd 100644
--- a/apps/emqx_resource/src/emqx_resource.erl
+++ b/apps/emqx_resource/src/emqx_resource.erl
@@ -79,8 +79,7 @@
query/2,
query/3,
%% query the instance without batching and queuing messages.
- simple_sync_query/2,
- simple_async_query/3
+ simple_sync_query/2
]).
%% Direct calls to the callback module
@@ -278,10 +277,6 @@ query(ResId, Request, Opts) ->
simple_sync_query(ResId, Request) ->
emqx_resource_worker:simple_sync_query(ResId, Request).
--spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term().
-simple_async_query(ResId, Request, ReplyFun) ->
- emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun).
-
-spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) ->
start(ResId, #{}).
diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl
index 697d5a84b..d7010c3bd 100644
--- a/apps/emqx_resource/src/emqx_resource_worker.erl
+++ b/apps/emqx_resource/src/emqx_resource_worker.erl
@@ -23,6 +23,7 @@
-include("emqx_resource_utils.hrl").
-include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(gen_statem).
@@ -32,12 +33,12 @@
sync_query/3,
async_query/3,
block/1,
- resume/1
+ resume/1,
+ flush_worker/1
]).
-export([
- simple_sync_query/2,
- simple_async_query/3
+ simple_sync_query/2
]).
-export([
@@ -51,12 +52,12 @@
-export([queue_item_marshaller/1, estimate_size/1]).
--export([reply_after_query/7, batch_reply_after_query/7]).
+-export([reply_after_query/8, batch_reply_after_query/8]).
+
+-export([clear_disk_queue_dir/2]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
--define(Q_ITEM(REQUEST), {q_item, REQUEST}).
-
-define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
@@ -65,6 +66,11 @@
?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH
]).
+-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
+ {Ref, BatchOrQuery, IsRetriable, WorkerMRef}
+).
+-define(RETRY_IDX, 3).
+-define(WORKER_MREF_IDX, 4).
-type id() :: binary().
-type index() :: pos_integer().
@@ -73,15 +79,17 @@
-type from() :: pid() | reply_fun() | request_from().
-type request_from() :: undefined | gen_statem:from().
-type state() :: blocked | running.
+-type inflight_key() :: integer().
-type data() :: #{
- id => id(),
- index => index(),
- inflight_tid => ets:tid(),
- batch_size => pos_integer(),
- batch_time => timer:time(),
- queue => replayq:q(),
- resume_interval => timer:time(),
- tref => undefined | timer:tref()
+ id := id(),
+ index := index(),
+ inflight_tid := ets:tid(),
+ async_workers := #{pid() => reference()},
+ batch_size := pos_integer(),
+ batch_time := timer:time(),
+ queue := replayq:q(),
+ resume_interval := timer:time(),
+ tref := undefined | timer:tref()
}.
callback_mode() -> [state_functions, state_enter].
@@ -92,7 +100,7 @@ start_link(Id, Index, Opts) ->
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) ->
PickKey = maps:get(pick_key, Opts, self()),
- Timeout = maps:get(timeout, Opts, infinity),
+ Timeout = maps:get(timeout, Opts, timer:seconds(15)),
emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
@@ -112,25 +120,12 @@ simple_sync_query(Id, Request) ->
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
Index = undefined,
- QueryOpts = #{},
+ QueryOpts = #{simple_query => true},
emqx_resource_metrics:matched_inc(Id),
- Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts),
- _ = handle_query_result(Id, Result, false, false),
- Result.
-
--spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
-simple_async_query(Id, Request, ReplyFun) ->
- %% Note: since calling this function implies in bypassing the
- %% buffer workers, and each buffer worker index is used when
- %% collecting gauge metrics, we use this dummy index. If this
- %% call ends up calling buffering functions, that's a bug and
- %% would mess up the metrics anyway. `undefined' is ignored by
- %% `emqx_resource_metrics:*_shift/3'.
- Index = undefined,
- QueryOpts = #{},
- emqx_resource_metrics:matched_inc(Id),
- Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts),
- _ = handle_query_result(Id, Result, false, false),
+ Ref = make_message_ref(),
+ Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
+ HasBeenSent = false,
+ _ = handle_query_result(Id, Result, HasBeenSent),
Result.
-spec block(pid()) -> ok.
@@ -141,6 +136,10 @@ block(ServerRef) ->
resume(ServerRef) ->
gen_statem:cast(ServerRef, resume).
+-spec flush_worker(pid()) -> ok.
+flush_worker(ServerRef) ->
+ gen_statem:cast(ServerRef, flush).
+
-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()).
init({Id, Index, Opts}) ->
process_flag(trap_exit, true),
@@ -156,33 +155,37 @@ init({Id, Index, Opts}) ->
max_total_bytes => TotalBytes,
%% we don't want to retain the queue after
%% resource restarts.
- offload => true,
+ offload => {true, volatile},
seg_bytes => SegBytes,
sizer => fun ?MODULE:estimate_size/1
},
Queue = replayq:open(QueueOpts),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0),
- InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
- InflightTID = inflight_new(InfltWinSZ, Id, Index),
- HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
- St = #{
+ InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
+ InflightTID = inflight_new(InflightWinSize, Id, Index),
+ HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
+ Data = #{
id => Id,
index => Index,
inflight_tid => InflightTID,
+ async_workers => #{},
batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue,
- resume_interval => maps:get(resume_interval, Opts, HCItvl),
+ resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined
},
- {ok, blocked, St, {next_event, cast, resume}}.
+ ?tp(resource_worker_init, #{id => Id, index => Index}),
+ {ok, running, Data}.
running(enter, _, St) ->
?tp(resource_worker_enter_running, #{}),
maybe_flush(St);
running(cast, resume, _St) ->
keep_state_and_data;
+running(cast, flush, Data) ->
+ flush(Data);
running(cast, block, St) ->
{next_state, blocked, St};
running(info, ?SEND_REQ(_From, _Req) = Request0, Data) ->
@@ -193,46 +196,48 @@ running(internal, flush, St) ->
flush(St);
running(info, {flush, _Ref}, _St) ->
keep_state_and_data;
+running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
+ is_map_key(Pid, AsyncWorkers0)
+->
+ ?SLOG(info, #{msg => async_worker_died, state => running, reason => Reason}),
+ handle_async_worker_down(Data0, Pid);
running(info, Info, _St) ->
?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}),
keep_state_and_data.
blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
?tp(resource_worker_enter_blocked, #{}),
- {keep_state_and_data, {state_timeout, ResumeT, resume}};
+ {keep_state_and_data, {state_timeout, ResumeT, unblock}};
blocked(cast, block, _St) ->
keep_state_and_data;
blocked(cast, resume, St) ->
- do_resume(St);
-blocked(state_timeout, resume, St) ->
- do_resume(St);
-blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) ->
- #{
- id := Id,
- index := Index,
- queue := Q
- } = Data0,
- From =
- case ReqFrom of
- undefined -> maps:get(async_reply_fun, Opts, undefined);
- From1 -> From1
- end,
- Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
- HasBeenSent = false,
- _ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)),
- NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]),
- Data = Data0#{queue := NewQ},
+ resume_from_blocked(St);
+blocked(cast, flush, Data) ->
+ resume_from_blocked(Data);
+blocked(state_timeout, unblock, St) ->
+ resume_from_blocked(St);
+blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
+ {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data};
blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data;
+blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
+ is_map_key(Pid, AsyncWorkers0)
+->
+ ?SLOG(info, #{msg => async_worker_died, state => blocked, reason => Reason}),
+ handle_async_worker_down(Data0, Pid);
blocked(info, Info, _Data) ->
?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}),
keep_state_and_data.
terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
+ _ = replayq:close(Q),
emqx_resource_metrics:inflight_set(Id, Index, 0),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
- gproc_pool:disconnect_worker(Id, {Id, Index}).
+ %% since we want volatile queues, this will be 0 after
+ %% termination.
+ emqx_resource_metrics:queuing_set(Id, Index, 0),
+ gproc_pool:disconnect_worker(Id, {Id, Index}),
+ ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -282,112 +287,92 @@ pick_cast(Id, Key, Query) ->
ok
end).
-do_resume(#{id := Id, inflight_tid := InflightTID} = Data) ->
- case inflight_get_first(InflightTID) of
- empty ->
- retry_queue(Data);
+resume_from_blocked(Data) ->
+ #{inflight_tid := InflightTID} = Data,
+ case inflight_get_first_retriable(InflightTID) of
+ none ->
+ case is_inflight_full(InflightTID) of
+ true ->
+ {keep_state, Data};
+ false ->
+ {next_state, running, Data}
+ end;
{Ref, FirstQuery} ->
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
- retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data)
- end.
-
-retry_queue(
- #{
- queue := Q0,
- id := Id,
- index := Index,
- batch_size := 1,
- inflight_tid := InflightTID,
- resume_interval := ResumeT
- } = Data0
-) ->
- %% no batching
- case get_first_n_from_queue(Q0, 1) of
- empty ->
- {next_state, running, Data0};
- {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
- QueryOpts = #{inflight_name => InflightTID},
- Result = call_query(configured, Id, Index, Query, QueryOpts),
- Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
- case reply_caller(Id, Reply) of
+ case is_inflight_full(InflightTID) of
true ->
- {keep_state, Data0, {state_timeout, ResumeT, resume}};
+ {keep_state, Data};
false ->
- ok = replayq:ack(Q1, QAckRef),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- Data = Data0#{queue := Q1},
- retry_queue(Data)
- end
- end;
-retry_queue(
- #{
- queue := Q,
- id := Id,
- index := Index,
- batch_size := BatchSize,
- inflight_tid := InflightTID,
- resume_interval := ResumeT
- } = Data0
-) ->
- %% batching
- case get_first_n_from_queue(Q, BatchSize) of
- empty ->
- {next_state, running, Data0};
- {Q1, QAckRef, Batch0} ->
- QueryOpts = #{inflight_name => InflightTID},
- Result = call_query(configured, Id, Index, Batch0, QueryOpts),
- %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
- %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
- Batch = [
- ?QUERY(undefined, Request, HasBeenSent0)
- || ?QUERY(_, Request, HasBeenSent0) <- Batch0
- ],
- case batch_reply_caller(Id, Result, Batch) of
- true ->
- ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
- {keep_state, Data0, {state_timeout, ResumeT, resume}};
- false ->
- ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
- ok = replayq:ack(Q1, QAckRef),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- Data = Data0#{queue := Q1},
- retry_queue(Data)
+ retry_inflight_sync(Ref, FirstQuery, Data)
end
end.
-retry_inflight_sync(
- Id,
- Ref,
- QueryOrBatch,
- InflightTID,
- #{index := Index, resume_interval := ResumeT} = Data0
-) ->
- QueryOpts = #{},
- %% if we are retrying an inflight query, it has been sent
- HasBeenSent = true,
- Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts),
- BlockWorker = false,
- case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of
+retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
+ #{
+ id := Id,
+ inflight_tid := InflightTID,
+ index := Index,
+ resume_interval := ResumeT
+ } = Data0,
+ ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
+ QueryOpts = #{simple_query => false},
+ Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
+ ReplyResult =
+ case QueryOrBatch of
+ ?QUERY(From, CoreReq, HasBeenSent) ->
+ Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
+ reply_caller_defer_metrics(Id, Reply, QueryOpts);
+ [?QUERY(_, _, _) | _] = Batch ->
+ batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
+ end,
+ case ReplyResult of
%% Send failed because resource is down
- true ->
- {keep_state, Data0, {state_timeout, ResumeT, resume}};
+ {nack, PostFn} ->
+ PostFn(),
+ ?tp(
+ resource_worker_retry_inflight_failed,
+ #{
+ ref => Ref,
+ query_or_batch => QueryOrBatch
+ }
+ ),
+ {keep_state, Data0, {state_timeout, ResumeT, unblock}};
%% Send ok or failed but the resource is working
- false ->
- inflight_drop(InflightTID, Ref, Id, Index),
- do_resume(Data0)
+ {ack, PostFn} ->
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+ %% we need to defer bumping the counters after
+ %% `inflight_drop' to avoid the race condition when an
+ %% inflight request might get completed concurrently with
+ %% the retry, bumping them twice. Since both inflight
+ %% requests (repeated and original) have the safe `Ref',
+ %% we bump the counter when removing it from the table.
+ IsAcked andalso PostFn(),
+ ?tp(
+ resource_worker_retry_inflight_succeeded,
+ #{
+ ref => Ref,
+ query_or_batch => QueryOrBatch
+ }
+ ),
+ resume_from_blocked(Data0)
end.
%% Called during the `running' state only.
--spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data().
+-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) ->
+ gen_statem:event_handler_result(state(), data()).
handle_query_requests(Request0, Data0) ->
+ {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
+ maybe_flush(Data).
+
+collect_and_enqueue_query_requests(Request0, Data0) ->
#{
id := Id,
index := Index,
queue := Q
} = Data0,
Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT),
- QueueItems =
+ Queries =
lists:map(
fun
(?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
@@ -400,21 +385,21 @@ handle_query_requests(Request0, Data0) ->
end,
Requests
),
- NewQ = append_queue(Id, Index, Q, QueueItems),
+ NewQ = append_queue(Id, Index, Q, Queries),
Data = Data0#{queue := NewQ},
- maybe_flush(Data).
+ {Queries, Data}.
-maybe_flush(Data) ->
+maybe_flush(Data0) ->
#{
batch_size := BatchSize,
queue := Q
- } = Data,
+ } = Data0,
QueueCount = queue_count(Q),
case QueueCount >= BatchSize of
true ->
- flush(Data);
+ flush(Data0);
false ->
- {keep_state, ensure_flush_timer(Data)}
+ {keep_state, ensure_flush_timer(Data0)}
end.
%% Called during the `running' state only.
@@ -422,20 +407,30 @@ maybe_flush(Data) ->
flush(Data0) ->
#{
batch_size := BatchSize,
+ inflight_tid := InflightTID,
queue := Q0
} = Data0,
- case replayq:count(Q0) of
- 0 ->
- Data = cancel_flush_timer(Data0),
- {keep_state, Data};
- _ ->
- {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
- Batch = [Item || ?Q_ITEM(Item) <- Batch0],
+ Data1 = cancel_flush_timer(Data0),
+ case {queue_count(Q0), is_inflight_full(InflightTID)} of
+ {0, _} ->
+ {keep_state, Data1};
+ {_, true} ->
+ ?tp(resource_worker_flush_but_inflight_full, #{}),
+ Data2 = ensure_flush_timer(Data1),
+ {keep_state, Data2};
+ {_, false} ->
+ {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1,
- do_flush(Data0, #{
+ %% We *must* use the new queue, because we currently can't
+ %% `nack' a `pop'.
+ %% Maybe we could re-open the queue?
+ Data2 = Data1#{queue := Q1},
+ Ref = make_message_ref(),
+ do_flush(Data2, #{
new_queue => Q1,
is_batch => IsBatch,
batch => Batch,
+ ref => Ref,
ack_ref => QAckRef
})
end.
@@ -443,10 +438,21 @@ flush(Data0) ->
-spec do_flush(data(), #{
is_batch := boolean(),
batch := [?QUERY(from(), request(), boolean())],
- ack_ref := replayq:ack_ref()
+ ack_ref := replayq:ack_ref(),
+ ref := inflight_key(),
+ new_queue := replayq:q()
}) ->
gen_statem:event_handler_result(state(), data()).
-do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
+do_flush(
+ Data0,
+ #{
+ is_batch := false,
+ batch := Batch,
+ ref := Ref,
+ ack_ref := QAckRef,
+ new_queue := Q1
+ }
+) ->
#{
id := Id,
index := Index,
@@ -454,149 +460,321 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
} = Data0,
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
- QueryOpts = #{inflight_name => InflightTID},
- Result = call_query(configured, Id, Index, Request, QueryOpts),
- IsAsync = is_async(Id),
- Data1 = cancel_flush_timer(Data0),
+ QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
+ Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
- case {reply_caller(Id, Reply), IsAsync} of
- %% failed and is not async; keep the request in the queue to
- %% be retried
- {true, false} ->
+ case reply_caller(Id, Reply, QueryOpts) of
+ %% Failed; remove the request from the queue, as we cannot pop
+ %% from it again, but we'll retry it using the inflight table.
+ nack ->
+ ok = replayq:ack(Q1, QAckRef),
+ %% we set it atomically just below; a limitation of having
+ %% to use tuples for atomic ets updates
+ IsRetriable = true,
+ WorkerMRef0 = undefined,
+ InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
+ %% we must append again to the table to ensure that the
+ %% request will be retried (i.e., it might not have been
+ %% inserted during `call_query' if the resource was down
+ %% and/or if it was a sync request).
+ inflight_append(InflightTID, InflightItem, Id, Index),
+ mark_inflight_as_retriable(InflightTID, Ref),
+ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+ store_async_worker_reference(InflightTID, Ref, WorkerMRef),
+ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+ ?tp(
+ resource_worker_flush_nack,
+ #{
+ ref => Ref,
+ is_retriable => IsRetriable,
+ batch_or_query => Request,
+ result => Result
+ }
+ ),
{next_state, blocked, Data1};
- %% failed and is async; remove the request from the queue, as
- %% it is already in inflight table
- {true, true} ->
+ %% Success; just ack.
+ ack ->
ok = replayq:ack(Q1, QAckRef),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- Data = Data1#{queue := Q1},
- {next_state, blocked, Data};
- %% success; just ack
- {false, _} ->
- ok = replayq:ack(Q1, QAckRef),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- Data2 = Data1#{queue := Q1},
- case replayq:count(Q1) > 0 of
+ %% Async requests are acked later when the async worker
+ %% calls the corresponding callback function. Also, we
+ %% must ensure the async worker is being monitored for
+ %% such requests.
+ IsUnrecoverableError = is_unrecoverable_error(Result),
+ case is_async_return(Result) of
+ true when IsUnrecoverableError ->
+ ack_inflight(InflightTID, Ref, Id, Index);
true ->
- {keep_state, Data2, [{next_event, internal, flush}]};
+ ok;
false ->
- {keep_state, Data2}
+ ack_inflight(InflightTID, Ref, Id, Index)
+ end,
+ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+ store_async_worker_reference(InflightTID, Ref, WorkerMRef),
+ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+ ?tp(
+ resource_worker_flush_ack,
+ #{
+ batch_or_query => Request,
+ result => Result
+ }
+ ),
+ case queue_count(Q1) > 0 of
+ true ->
+ {keep_state, Data1, [{next_event, internal, flush}]};
+ false ->
+ {keep_state, Data1}
end
end;
-do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
+do_flush(Data0, #{
+ is_batch := true,
+ batch := Batch,
+ ref := Ref,
+ ack_ref := QAckRef,
+ new_queue := Q1
+}) ->
#{
id := Id,
index := Index,
batch_size := BatchSize,
inflight_tid := InflightTID
} = Data0,
- QueryOpts = #{inflight_name => InflightTID},
- Result = call_query(configured, Id, Index, Batch, QueryOpts),
- IsAsync = is_async(Id),
- Data1 = cancel_flush_timer(Data0),
- case {batch_reply_caller(Id, Result, Batch), IsAsync} of
- %% failed and is not async; keep the request in the queue to
- %% be retried
- {true, false} ->
+ QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
+ Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
+ case batch_reply_caller(Id, Result, Batch, QueryOpts) of
+ %% Failed; remove the request from the queue, as we cannot pop
+ %% from it again, but we'll retry it using the inflight table.
+ nack ->
+ ok = replayq:ack(Q1, QAckRef),
+ %% we set it atomically just below; a limitation of having
+ %% to use tuples for atomic ets updates
+ IsRetriable = true,
+ WorkerMRef0 = undefined,
+ InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
+ %% we must append again to the table to ensure that the
+ %% request will be retried (i.e., it might not have been
+ %% inserted during `call_query' if the resource was down
+ %% and/or if it was a sync request).
+ inflight_append(InflightTID, InflightItem, Id, Index),
+ mark_inflight_as_retriable(InflightTID, Ref),
+ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+ store_async_worker_reference(InflightTID, Ref, WorkerMRef),
+ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
+ ?tp(
+ resource_worker_flush_nack,
+ #{
+ ref => Ref,
+ is_retriable => IsRetriable,
+ batch_or_query => Batch,
+ result => Result
+ }
+ ),
{next_state, blocked, Data1};
- %% failed and is async; remove the request from the queue, as
- %% it is already in inflight table
- {true, true} ->
+ %% Success; just ack.
+ ack ->
ok = replayq:ack(Q1, QAckRef),
+ %% Async requests are acked later when the async worker
+ %% calls the corresponding callback function. Also, we
+ %% must ensure the async worker is being monitored for
+ %% such requests.
+ IsUnrecoverableError = is_unrecoverable_error(Result),
+ case is_async_return(Result) of
+ true when IsUnrecoverableError ->
+ ack_inflight(InflightTID, Ref, Id, Index);
+ true ->
+ ok;
+ false ->
+ ack_inflight(InflightTID, Ref, Id, Index)
+ end,
+ {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
+ store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- Data = Data1#{queue := Q1},
- {next_state, blocked, Data};
- %% success; just ack
- {false, _} ->
- ok = replayq:ack(Q1, QAckRef),
- emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
- CurrentCount = replayq:count(Q1),
- Data2 = Data1#{queue := Q1},
+ ?tp(
+ resource_worker_flush_ack,
+ #{
+ batch_or_query => Batch,
+ result => Result
+ }
+ ),
+ CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} ->
- {keep_state, Data2};
+ {keep_state, Data1};
{true, true} ->
- {keep_state, Data2, [{next_event, internal, flush}]};
+ {keep_state, Data1, [{next_event, internal, flush}]};
{true, false} ->
- Data3 = ensure_flush_timer(Data2),
- {keep_state, Data3}
+ Data2 = ensure_flush_timer(Data1),
+ {keep_state, Data2}
end
end.
-batch_reply_caller(Id, BatchResult, Batch) ->
- lists:foldl(
- fun(Reply, BlockWorker) ->
- reply_caller(Id, Reply, BlockWorker)
- end,
- false,
- %% the `Mod:on_batch_query/3` returns a single result for a batch,
- %% so we need to expand
- ?EXPAND(BatchResult, Batch)
- ).
+batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
+ {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
+ PostFn(),
+ ShouldBlock.
-reply_caller(Id, Reply) ->
- BlockWorker = false,
- reply_caller(Id, Reply, BlockWorker).
+batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
+ {ShouldAck, PostFns} =
+ lists:foldl(
+ fun(Reply, {_ShouldAck, PostFns}) ->
+ {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
+ {ShouldAck, [PostFn | PostFns]}
+ end,
+ {ack, []},
+ %% the `Mod:on_batch_query/3` returns a single result for a batch,
+ %% so we need to expand
+ ?EXPAND(BatchResult, Batch)
+ ),
+ PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
+ {ShouldAck, PostFn}.
-reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
- handle_query_result(Id, Result, HasBeenSent, BlockWorker);
-reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
+reply_caller(Id, Reply, QueryOpts) ->
+ {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
+ PostFn(),
+ ShouldAck.
+
+%% Should only reply to the caller when the decision is final (not
+%% retriable). See comment on `handle_query_result_pure'.
+reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) ->
+ handle_query_result_pure(Id, Result, HasBeenSent);
+reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when
is_function(ReplyFun)
->
- _ =
- case Result of
- {async_return, _} -> no_reply_for_now;
- _ -> apply(ReplyFun, Args ++ [Result])
- end,
- handle_query_result(Id, Result, HasBeenSent, BlockWorker);
-reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) ->
- gen_statem:reply(From, Result),
- handle_query_result(Id, Result, HasBeenSent, BlockWorker).
+ IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
+ IsUnrecoverableError = is_unrecoverable_error(Result),
+ {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+ case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
+ {ack, {async_return, _}, true, _} ->
+ apply(ReplyFun, Args ++ [Result]),
+ ok;
+ {ack, {async_return, _}, false, _} ->
+ ok;
+ {_, _, _, true} ->
+ apply(ReplyFun, Args ++ [Result]),
+ ok;
+ {nack, _, _, _} ->
+ ok;
+ {ack, _, _, _} ->
+ apply(ReplyFun, Args ++ [Result]),
+ ok
+ end,
+ {ShouldAck, PostFn};
+reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) ->
+ IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
+ IsUnrecoverableError = is_unrecoverable_error(Result),
+ {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+ case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
+ {ack, {async_return, _}, true, _} ->
+ gen_statem:reply(From, Result),
+ ok;
+ {ack, {async_return, _}, false, _} ->
+ ok;
+ {_, _, _, true} ->
+ gen_statem:reply(From, Result),
+ ok;
+ {nack, _, _, _} ->
+ ok;
+ {ack, _, _, _} ->
+ gen_statem:reply(From, Result),
+ ok
+ end,
+ {ShouldAck, PostFn}.
-handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{msg => resource_exception, info => Msg}),
- inc_sent_failed(Id, HasBeenSent),
- BlockWorker;
-handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when
+handle_query_result(Id, Result, HasBeenSent) ->
+ {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
+ PostFn(),
+ ShouldBlock.
+
+%% We should always retry (nack), except when:
+%% * resource is not found
+%% * resource is stopped
+%% * the result is a success (or at least a delayed result)
+%% We also retry even sync requests. In that case, we shouldn't reply
+%% the caller until one of those final results above happen.
+handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{msg => resource_exception, info => Msg}),
+ ok
+ end,
+ {nack, PostFn};
+handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
NotWorking == not_connected; NotWorking == blocked
->
- true;
-handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
- emqx_resource_metrics:dropped_resource_not_found_inc(Id),
- BlockWorker;
-handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
- emqx_resource_metrics:dropped_resource_stopped_inc(Id),
- BlockWorker;
-handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
- emqx_resource_metrics:dropped_other_inc(Id),
- BlockWorker;
-handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) ->
+ {nack, fun() -> ok end};
+handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
+ emqx_resource_metrics:dropped_resource_not_found_inc(Id),
+ ok
+ end,
+ {ack, PostFn};
+handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
+ emqx_resource_metrics:dropped_resource_stopped_inc(Id),
+ ok
+ end,
+ {ack, PostFn};
+handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
+ ok
+ end,
+ {nack, PostFn};
+handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
+ inc_sent_failed(Id, HasBeenSent),
+ ok
+ end,
+ {ack, PostFn};
+handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
%% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
%% sent this message.
- ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
- true;
-handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
- inc_sent_failed(Id, HasBeenSent),
- BlockWorker;
-handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) ->
- true;
-handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) ->
- ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
- inc_sent_failed(Id, HasBeenSent),
- BlockWorker;
-handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) ->
- BlockWorker;
-handle_query_result(Id, Result, HasBeenSent, BlockWorker) ->
- assert_ok_result(Result),
- inc_sent_success(Id, HasBeenSent),
- BlockWorker.
+ PostFn = fun() ->
+ ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
+ ok
+ end,
+ {nack, PostFn};
+handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
+ ok
+ end,
+ {nack, PostFn};
+handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
+ inc_sent_failed(Id, HasBeenSent),
+ ok
+ end,
+ {ack, PostFn};
+handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
+ PostFn = fun() ->
+ ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
+ ok
+ end,
+ {nack, PostFn};
+handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
+ {ack, fun() -> ok end};
+handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
+ {ack, fun() -> ok end};
+handle_query_result_pure(Id, Result, HasBeenSent) ->
+ PostFn = fun() ->
+ assert_ok_result(Result),
+ inc_sent_success(Id, HasBeenSent),
+ ok
+ end,
+ {ack, PostFn}.
-call_query(QM0, Id, Index, Query, QueryOpts) ->
+handle_async_worker_down(Data0, Pid) ->
+ #{async_workers := AsyncWorkers0} = Data0,
+ {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
+ Data = Data0#{async_workers := AsyncWorkers},
+ mark_inflight_items_as_retriable(Data, WorkerMRef),
+ {keep_state, Data}.
+
+call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
@@ -605,8 +783,9 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
configured -> maps:get(query_mode, Data);
_ -> QM0
end,
- CM = maps:get(callback_mode, Data),
- apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts);
+ CBM = maps:get(callback_mode, Data),
+ CallMode = call_mode(QM, CBM),
+ apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
@@ -633,158 +812,202 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
end
).
-apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
- ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
+apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
+ ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
-apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
- ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
- InflightTID = maps:get(inflight_name, QueryOpts, undefined),
+apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
+ ?tp(call_query_async, #{
+ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
+ }),
+ InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query_async,
- case is_inflight_full(InflightTID) of
- true ->
- {async_return, inflight_full};
- false ->
- ReplyFun = fun ?MODULE:reply_after_query/7,
- Ref = make_message_ref(),
- Args = [self(), Id, Index, InflightTID, Ref, Query],
- ok = inflight_append(InflightTID, Ref, Query, Id, Index),
- Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
- {async_return, Result}
+ begin
+ ReplyFun = fun ?MODULE:reply_after_query/8,
+ Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts],
+ IsRetriable = false,
+ WorkerMRef = undefined,
+ InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
+ ok = inflight_append(InflightTID, InflightItem, Id, Index),
+ Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
+ {async_return, Result}
end,
Request
);
-apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
- ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
+apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
+ ?tp(call_batch_query, #{
+ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
+ }),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
-apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
- ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
- InflightTID = maps:get(inflight_name, QueryOpts, undefined),
+apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
+ ?tp(call_batch_query_async, #{
+ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
+ }),
+ InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_batch_query_async,
- case is_inflight_full(InflightTID) of
- true ->
- {async_return, inflight_full};
- false ->
- ReplyFun = fun ?MODULE:batch_reply_after_query/7,
- Ref = make_message_ref(),
- ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
- Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
- ok = inflight_append(InflightTID, Ref, Batch, Id, Index),
- Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
- {async_return, Result}
+ begin
+ ReplyFun = fun ?MODULE:batch_reply_after_query/8,
+ ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
+ Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
+ IsRetriable = false,
+ WorkerMRef = undefined,
+ InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
+ ok = inflight_append(InflightTID, InflightItem, Id, Index),
+ Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
+ {async_return, Result}
end,
Batch
).
-reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) ->
+reply_after_query(
+ Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result
+) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
- case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
- true ->
+ {Action, PostFn} = reply_caller_defer_metrics(
+ Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts
+ ),
+ case Action of
+ nack ->
+ %% Keep retrying.
+ ?tp(resource_worker_reply_after_query, #{
+ action => Action,
+ batch_or_query => ?QUERY(From, Request, HasBeenSent),
+ ref => Ref,
+ result => Result
+ }),
+ mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
- false ->
- drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
+ ack ->
+ ?tp(resource_worker_reply_after_query, #{
+ action => Action,
+ batch_or_query => ?QUERY(From, Request, HasBeenSent),
+ ref => Ref,
+ result => Result
+ }),
+ IsFullBefore = is_inflight_full(InflightTID),
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+ IsAcked andalso PostFn(),
+ IsFullBefore andalso ?MODULE:flush_worker(Pid),
+ ok
end.
-batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
+batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
- case batch_reply_caller(Id, Result, Batch) of
- true ->
+ {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
+ case Action of
+ nack ->
+ %% Keep retrying.
+ ?tp(resource_worker_reply_after_query, #{
+ action => nack,
+ batch_or_query => Batch,
+ ref => Ref,
+ result => Result
+ }),
+ mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
- false ->
- drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
- end.
-
-drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
- case is_inflight_full(InflightTID) of
- true ->
- inflight_drop(InflightTID, Ref, Id, Index),
- ?MODULE:resume(Pid);
- false ->
- inflight_drop(InflightTID, Ref, Id, Index)
+ ack ->
+ ?tp(resource_worker_reply_after_query, #{
+ action => ack,
+ batch_or_query => Batch,
+ ref => Ref,
+ result => Result
+ }),
+ IsFullBefore = is_inflight_full(InflightTID),
+ IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
+ IsAcked andalso PostFn(),
+ IsFullBefore andalso ?MODULE:flush_worker(Pid),
+ ok
end.
%%==============================================================================
%% operations for queue
-queue_item_marshaller(?Q_ITEM(_) = I) ->
- term_to_binary(I);
queue_item_marshaller(Bin) when is_binary(Bin) ->
- binary_to_term(Bin).
+ binary_to_term(Bin);
+queue_item_marshaller(Item) ->
+ term_to_binary(Item).
estimate_size(QItem) ->
- size(queue_item_marshaller(QItem)).
+ erlang:external_size(QItem).
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q().
-append_queue(Id, Index, Q, Queries) ->
+append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
+ %% we must not append a raw binary because the marshaller will get
+ %% lost.
+ Q0 = replayq:append(Q, Queries),
Q2 =
- case replayq:overflow(Q) of
+ case replayq:overflow(Q0) of
Overflow when Overflow =< 0 ->
- Q;
+ Q0;
Overflow ->
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
- {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
+ {Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
emqx_resource_metrics:dropped_queue_full_inc(Id),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
end,
- Items = [?Q_ITEM(X) || X <- Queries],
- Q3 = replayq:append(Q2, Items),
- emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
- ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
- Q3.
-
--spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
- empty | {replayq:q(), replayq:ack_ref(), [?Q_ITEM(?QUERY(_From, _Request, _HasBeenSent))]}.
-get_first_n_from_queue(Q, N) ->
- case replayq:count(Q) of
- 0 ->
- empty;
- _ ->
- {NewQ, QAckRef, Items} = replayq:pop(Q, #{count_limit => N}),
- Queries = [X || ?Q_ITEM(X) <- Items],
- {NewQ, QAckRef, Queries}
- end.
+ emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
+ ?tp(
+ resource_worker_appended_to_queue,
+ #{
+ id => Id,
+ items => Queries,
+ queue_count => queue_count(Q2)
+ }
+ ),
+ Q2.
%%==============================================================================
%% the inflight queue for async query
--define(MAX_SIZE_REF, -1).
--define(SIZE_REF, -2).
+-define(MAX_SIZE_REF, max_size).
+-define(SIZE_REF, size).
+-define(INITIAL_TIME_REF, initial_time).
+-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
+
inflight_new(InfltWinSZ, Id, Index) ->
TableId = ets:new(
emqx_resource_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}]
),
- inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
+ inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
%% we use this counter because we might deal with batches as
%% elements.
- inflight_append(TableId, ?SIZE_REF, 0, Id, Index),
+ inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
+ inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
+ inflight_append(
+ TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index
+ ),
TableId.
-inflight_get_first(InflightTID) ->
- case ets:next(InflightTID, ?MAX_SIZE_REF) of
- '$end_of_table' ->
- empty;
- Ref ->
- case ets:lookup(InflightTID, Ref) of
- [Object] ->
- Object;
- [] ->
- %% it might have been dropped
- inflight_get_first(InflightTID)
+-spec inflight_get_first_retriable(ets:tid()) ->
+ none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
+inflight_get_first_retriable(InflightTID) ->
+ MatchSpec =
+ ets:fun2ms(
+ fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
+ IsRetriable =:= true
+ ->
+ {Ref, BatchOrQuery}
end
+ ),
+ case ets:select(InflightTID, MatchSpec, _Limit = 1) of
+ '$end_of_table' ->
+ none;
+ {[{Ref, BatchOrQuery}], _Continuation} ->
+ {Ref, BatchOrQuery}
end.
is_inflight_full(undefined) ->
false;
is_inflight_full(InflightTID) ->
- [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
+ [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages
%% because one batch request may hold several messages.
Size = inflight_num_batches(InflightTID),
@@ -803,38 +1026,111 @@ inflight_num_msgs(InflightTID) ->
[{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
Size.
-inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
+inflight_append(undefined, _InflightItem, _Id, _Index) ->
ok;
-inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
+inflight_append(
+ InflightTID,
+ ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
+ Id,
+ Index
+) ->
Batch = mark_as_sent(Batch0),
- ets:insert(InflightTID, {Ref, Batch}),
+ InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
+ IsNew = ets:insert_new(InflightTID, InflightItem),
BatchSize = length(Batch),
- ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
+ IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
+ ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok;
-inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
+inflight_append(
+ InflightTID,
+ ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef),
+ Id,
+ Index
+) ->
Query = mark_as_sent(Query0),
- ets:insert(InflightTID, {Ref, Query}),
- ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
+ InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
+ IsNew = ets:insert_new(InflightTID, InflightItem),
+ IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
+ ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok;
-inflight_append(InflightTID, Ref, Data, _Id, _Index) ->
+inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
ets:insert(InflightTID, {Ref, Data}),
%% this is a metadata row being inserted; therefore, we don't bump
%% the inflight metric.
ok.
-inflight_drop(undefined, _, _Id, _Index) ->
+%% a request was already appended and originally not retriable, but an
+%% error occurred and it is now retriable.
+mark_inflight_as_retriable(undefined, _Ref) ->
ok;
-inflight_drop(InflightTID, Ref, Id, Index) ->
+mark_inflight_as_retriable(InflightTID, Ref) ->
+ _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
+ ok.
+
+%% Track each worker pid only once.
+ensure_async_worker_monitored(
+ Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result
+) when
+ is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers)
+->
+ WorkerMRef = maps:get(WorkerPid, AsyncWorkers),
+ {Data0, WorkerMRef};
+ensure_async_worker_monitored(
+ Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}}
+) when
+ is_pid(WorkerPid)
+->
+ WorkerMRef = monitor(process, WorkerPid),
+ AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef},
+ Data = Data0#{async_workers := AsyncWorkers},
+ {Data, WorkerMRef};
+ensure_async_worker_monitored(Data0, _Result) ->
+ {Data0, undefined}.
+
+store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) ->
+ ok;
+store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) ->
+ ok;
+store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
+ is_reference(WorkerMRef)
+->
+ _ = ets:update_element(
+ InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef}
+ ),
+ ok.
+
+ack_inflight(undefined, _Ref, _Id, _Index) ->
+ false;
+ack_inflight(InflightTID, Ref, Id, Index) ->
Count =
case ets:take(InflightTID, Ref) of
- [{Ref, ?QUERY(_, _, _)}] -> 1;
- [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
- _ -> 0
+ [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] ->
+ 1;
+ [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
+ length(Batch);
+ _ ->
+ 0
end,
- Count > 0 andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+ IsAcked = Count > 0,
+ IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
+ IsAcked.
+
+mark_inflight_items_as_retriable(Data, WorkerMRef) ->
+ #{inflight_tid := InflightTID} = Data,
+ IsRetriable = true,
+ MatchSpec =
+ ets:fun2ms(
+ fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when
+ WorkerMRef =:= WorkerMRef0
+ ->
+ ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0)
+ end
+ ),
+ _NumAffected = ets:select_replace(InflightTID, MatchSpec),
+ ?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}),
ok.
%%==============================================================================
@@ -872,9 +1168,18 @@ queue_count(Q) ->
disk_queue_dir(Id, Index) ->
QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
- QDir = filename:join([emqx:data_dir(), "resource_worker", node(), QDir0]),
+ QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
emqx_misc:safe_filename(QDir).
+clear_disk_queue_dir(Id, Index) ->
+ ReplayQDir = disk_queue_dir(Id, Index),
+ case file:del_dir_r(ReplayQDir) of
+ {error, enoent} ->
+ ok;
+ Res ->
+ Res
+ end.
+
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),
@@ -888,8 +1193,9 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
_ = erlang:cancel_timer(TRef),
St#{tref => undefined}.
+-spec make_message_ref() -> inflight_key().
make_message_ref() ->
- erlang:unique_integer([monotonic, positive]).
+ erlang:monotonic_time(nanosecond).
collect_requests(Acc, Limit) ->
Count = length(Acc),
@@ -911,10 +1217,21 @@ mark_as_sent(?QUERY(From, Req, _)) ->
HasBeenSent = true,
?QUERY(From, Req, HasBeenSent).
-is_async(ResourceId) ->
- case emqx_resource_manager:ets_lookup(ResourceId) of
- {ok, _Group, #{query_mode := QM, callback_mode := CM}} ->
- call_mode(QM, CM) =:= async;
- _ ->
- false
- end.
+is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
+ true;
+is_unrecoverable_error({error, {recoverable_error, _}}) ->
+ false;
+is_unrecoverable_error({async_return, Result}) ->
+ is_unrecoverable_error(Result);
+is_unrecoverable_error({error, _}) ->
+ %% TODO: delete this clause.
+ %% Ideally all errors except for 'unrecoverable_error' should be
+ %% retried, including DB schema errors.
+ true;
+is_unrecoverable_error(_) ->
+ false.
+
+is_async_return({async_return, _}) ->
+ true;
+is_async_return(_) ->
+ false.
diff --git a/apps/emqx_resource/src/emqx_resource_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_worker_sup.erl
index b6557620c..8b0ce2c65 100644
--- a/apps/emqx_resource/src/emqx_resource_worker_sup.erl
+++ b/apps/emqx_resource/src/emqx_resource_worker_sup.erl
@@ -67,7 +67,8 @@ stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts),
lists:foreach(
fun(Idx) ->
- ensure_worker_removed(ResId, Idx)
+ _ = ensure_worker_removed(ResId, Idx),
+ ensure_disk_queue_dir_absent(ResId, Idx)
end,
lists:seq(1, WorkerPoolSize)
),
@@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) ->
{error, Reason}
end.
+ensure_disk_queue_dir_absent(ResourceId, Index) ->
+ ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
+ ok.
+
ensure_worker_pool_removed(ResId) ->
try
gproc_pool:delete(ResId)
diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
index d105b21ef..ea5ee97ca 100644
--- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl
+++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
@@ -48,6 +48,7 @@ fields("creation_opts") ->
{health_check_interval, fun health_check_interval/1},
{auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1},
+ {request_timeout, fun request_timeout/1},
{async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1},
@@ -80,6 +81,11 @@ query_mode(default) -> async;
query_mode(required) -> false;
query_mode(_) -> undefined.
+request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
+request_timeout(desc) -> ?DESC("request_timeout");
+request_timeout(default) -> <<"15s">>;
+request_timeout(_) -> undefined.
+
enable_batch(type) -> boolean();
enable_batch(required) -> false;
enable_batch(default) -> true;
diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl
index 692895548..c2b0c5733 100644
--- a/apps/emqx_resource/test/emqx_connector_demo.erl
+++ b/apps/emqx_resource/test/emqx_connector_demo.erl
@@ -85,9 +85,25 @@ on_query(_InstId, get_state_failed, State) ->
on_query(_InstId, block, #{pid := Pid}) ->
Pid ! block,
ok;
+on_query(_InstId, block_now, #{pid := Pid}) ->
+ Pid ! block,
+ {error, {resource_error, #{reason => blocked, msg => blocked}}};
on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume,
ok;
+on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
+ ReqRef = make_ref(),
+ From = {self(), ReqRef},
+ Pid ! {From, {big_payload, Payload}},
+ receive
+ {ReqRef, ok} ->
+ ?tp(connector_demo_big_payload, #{payload => Payload}),
+ ok;
+ {ReqRef, incorrect_status} ->
+ {error, {recoverable_error, incorrect_status}}
+ after 1000 ->
+ {error, timeout}
+ end;
on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
ReqRef = make_ref(),
From = {self(), ReqRef},
@@ -122,10 +138,16 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
Pid ! {inc, N, ReplyFun},
- ok;
+ {ok, Pid};
on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
Pid ! {get, ReplyFun},
- ok.
+ {ok, Pid};
+on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
+ Pid ! {block_now, ReplyFun},
+ {ok, Pid};
+on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
+ Pid ! {big_payload, Payload, ReplyFun},
+ {ok, Pid}.
on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but
@@ -134,17 +156,22 @@ on_batch_query(InstId, BatchReq, State) ->
{inc_counter, _} ->
batch_inc_counter(sync, InstId, BatchReq, State);
get_counter ->
- batch_get_counter(sync, InstId, State)
+ batch_get_counter(sync, InstId, State);
+ {big_payload, _Payload} ->
+ batch_big_payload(sync, InstId, BatchReq, State)
end.
on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
- %% Requests can be either 'get_counter' or 'inc_counter', but
- %% cannot be mixed.
+ %% Requests can be of multiple types, but cannot be mixed.
case hd(BatchReq) of
{inc_counter, _} ->
batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
get_counter ->
- batch_get_counter({async, ReplyFunAndArgs}, InstId, State)
+ batch_get_counter({async, ReplyFunAndArgs}, InstId, State);
+ block_now ->
+ on_query_async(InstId, block_now, ReplyFunAndArgs, State);
+ {big_payload, _Payload} ->
+ batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State)
end.
batch_inc_counter(CallMode, InstId, BatchReq, State) ->
@@ -171,6 +198,19 @@ batch_get_counter(sync, InstId, State) ->
batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
+batch_big_payload(sync, InstId, Batch, State) ->
+ [Res | _] = lists:map(
+ fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end,
+ Batch
+ ),
+ Res;
+batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) ->
+ lists:foreach(
+ fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end,
+ Batch
+ ),
+ {ok, Pid}.
+
on_get_status(_InstId, #{health_check_error := true}) ->
disconnected;
on_get_status(_InstId, #{pid := Pid}) ->
@@ -186,7 +226,11 @@ spawn_counter_process(Name, Register) ->
Pid.
counter_loop() ->
- counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}).
+ counter_loop(#{
+ counter => 0,
+ status => running,
+ incorrect_status_count => 0
+ }).
counter_loop(
#{
@@ -200,6 +244,12 @@ counter_loop(
block ->
ct:pal("counter recv: ~p", [block]),
State#{status => blocked};
+ {block_now, ReplyFun} ->
+ ct:pal("counter recv: ~p", [block_now]),
+ apply_reply(
+ ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}}
+ ),
+ State#{status => blocked};
resume ->
{messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
@@ -209,6 +259,9 @@ counter_loop(
apply_reply(ReplyFun, ok),
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
+ {big_payload, _Payload, ReplyFun} when Status == blocked ->
+ apply_reply(ReplyFun, {error, blocked}),
+ State;
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok},
@@ -216,6 +269,12 @@ counter_loop(
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
+ {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
+ FromPid ! {ReqRef, incorrect_status},
+ State#{incorrect_status_count := IncorrectCount + 1};
+ {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running ->
+ FromPid ! {ReqRef, ok},
+ State;
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;
diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl
index cdec414c9..97bc8da66 100644
--- a/apps/emqx_resource/test/emqx_resource_SUITE.erl
+++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl
@@ -411,22 +411,18 @@ t_query_counter_async_inflight(_) ->
%% send async query to make the inflight window full
?check_trace(
- ?TRACE_OPTS,
- inc_counter_in_parallel(WindowSize, ReqOpts),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(WindowSize, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 1_000
+ ),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
tap_metrics(?LINE),
-
- %% this will block the resource_worker as the inflight window is full now
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_resource:query(?ID, {inc_counter, 2}),
- #{?snk_kind := resource_worker_enter_blocked},
- 1_000
- ),
?assertMatch(0, ets:info(Tab0, size)),
tap_metrics(?LINE),
@@ -436,17 +432,16 @@ t_query_counter_async_inflight(_) ->
ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{})
end,
- {ok, {ok, _}} =
- ?wait_async_action(
- emqx_resource:query(?ID, {inc_counter, 3}, #{
- async_reply_fun => {Insert, [Tab0, tmp_query]}
- }),
- #{?snk_kind := tmp_query_inserted},
- 1_000
- ),
%% since this counts as a failure, it'll be enqueued and retried
%% later, when the resource is unblocked.
- ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:query(?ID, {inc_counter, 99}, #{
+ async_reply_fun => {Insert, [Tab0, tmp_query]}
+ }),
+ #{?snk_kind := resource_worker_appended_to_queue},
+ 1_000
+ ),
tap_metrics(?LINE),
%% all responses should be received after the resource is resumed.
@@ -455,46 +450,49 @@ t_query_counter_async_inflight(_) ->
%% +1 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 1,
- _Timeout = 60_000
+ _Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
{ok, _} = snabbkaffe:receive_events(SRef0),
+ tap_metrics(?LINE),
%% since the previous tmp_query was enqueued to be retried, we
%% take it again from the table; this time, it should have
%% succeeded.
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
- ?assertEqual(WindowSize, ets:info(Tab0, size)),
- tap_metrics(?LINE),
%% send async query, this time everything should be ok.
Num = 10,
?check_trace(
- ?TRACE_OPTS,
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
Num,
- _Timeout = 60_000
+ _Timeout0 = 10_000
),
- inc_counter_in_parallel(Num, ReqOpts),
+ inc_counter_in_parallel_increasing(Num, 1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
- ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace)
+ ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace),
+ ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+ tap_metrics(?LINE),
+ ok
end
),
- ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
- tap_metrics(?LINE),
%% block the resource
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
- ?TRACE_OPTS,
- inc_counter_in_parallel(WindowSize, ReqOpts),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(WindowSize, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 1_000
+ ),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
@@ -502,17 +500,18 @@ t_query_counter_async_inflight(_) ->
),
%% this will block the resource_worker
- ok = emqx_resource:query(?ID, {inc_counter, 1}),
+ ok = emqx_resource:query(?ID, {inc_counter, 4}),
Sent = WindowSize + Num + WindowSize,
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize,
- _Timeout = 60_000
+ _Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
- ?assertEqual(Sent, ets:info(Tab0, size)),
+ ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+ tap_metrics(?LINE),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
@@ -572,7 +571,7 @@ t_query_counter_async_inflight_batch(_) ->
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
BatchSize = 2,
- WindowSize = 3,
+ WindowSize = 15,
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
@@ -594,16 +593,12 @@ t_query_counter_async_inflight_batch(_) ->
%% send async query to make the inflight window full
NumMsgs = BatchSize * WindowSize,
?check_trace(
- begin
- {ok, SRef} = snabbkaffe:subscribe(
- ?match_event(#{?snk_kind := call_batch_query_async}),
- WindowSize,
- _Timeout = 60_000
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(NumMsgs, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 5_000
),
- inc_counter_in_parallel(NumMsgs, ReqOpts),
- {ok, _} = snabbkaffe:receive_events(SRef),
- ok
- end,
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@@ -628,7 +623,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 2}),
- #{?snk_kind := resource_worker_enter_blocked},
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
5_000
),
?assertMatch(0, ets:info(Tab0, size)),
@@ -644,17 +639,16 @@ t_query_counter_async_inflight_batch(_) ->
ets:insert(Tab, {Ref, Result}),
?tp(tmp_query_inserted, #{})
end,
+ %% since this counts as a failure, it'll be enqueued and retried
+ %% later, when the resource is unblocked.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 3}, #{
async_reply_fun => {Insert, [Tab0, tmp_query]}
}),
- #{?snk_kind := tmp_query_inserted},
+ #{?snk_kind := resource_worker_appended_to_queue},
1_000
),
- %% since this counts as a failure, it'll be enqueued and retried
- %% later, when the resource is unblocked.
- ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
tap_metrics(?LINE),
%% all responses should be received after the resource is resumed.
@@ -663,7 +657,7 @@ t_query_counter_async_inflight_batch(_) ->
%% +1 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 1,
- _Timeout = 60_000
+ 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
@@ -684,7 +678,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NumBatches1,
- _Timeout = 60_000
+ 10_000
),
inc_counter_in_parallel(NumMsgs1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
@@ -709,8 +703,12 @@ t_query_counter_async_inflight_batch(_) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
- ?TRACE_OPTS,
- inc_counter_in_parallel(WindowSize, ReqOpts),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ inc_counter_in_parallel(NumMsgs, ReqOpts),
+ #{?snk_kind := resource_worker_flush_but_inflight_full},
+ 5_000
+ ),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@@ -723,15 +721,15 @@ t_query_counter_async_inflight_batch(_) ->
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}),
- Sent = NumMsgs + NumMsgs1 + WindowSize,
+ Sent = NumMsgs + NumMsgs1 + NumMsgs,
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize,
- _Timeout = 60_000
+ 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
- ?assertEqual(Sent, ets:info(Tab0, size)),
+ ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
@@ -774,10 +772,8 @@ t_healthy_timeout(_) ->
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200}
),
- ?assertMatch(
- ?RESOURCE_ERROR(not_connected),
- emqx_resource:query(?ID, get_state)
- ),
+ ?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})),
+ ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
ok = emqx_resource:remove_local(?ID).
t_healthy(_) ->
@@ -842,6 +838,8 @@ t_stop_start(_) ->
?assert(is_process_alive(Pid0)),
%% metrics are reset when recreating
+ %% depending on timing, might show the request we just did.
+ ct:sleep(500),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
ok = emqx_resource:stop(?ID),
@@ -861,6 +859,7 @@ t_stop_start(_) ->
?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics
+ ct:sleep(500),
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
@@ -1067,7 +1066,7 @@ t_retry_batch(_Config) ->
%% batch shall remain enqueued.
{ok, _} =
snabbkaffe:block_until(
- ?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}),
+ ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}),
5_000
),
%% should not have increased the matched count with the retries
@@ -1079,7 +1078,7 @@ t_retry_batch(_Config) ->
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
- #{?snk_kind := resource_worker_retry_queue_batch_succeeded},
+ #{?snk_kind := resource_worker_retry_inflight_succeeded},
5_000
),
%% 1 more because of the `resume' call
@@ -1116,6 +1115,390 @@ t_retry_batch(_Config) ->
),
ok.
+t_delete_and_re_create_with_same_name(_Config) ->
+ NumBufferWorkers = 2,
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => sync,
+ batch_size => 1,
+ worker_pool_size => NumBufferWorkers,
+ queue_seg_bytes => 100,
+ resume_interval => 1_000
+ }
+ ),
+ %% pre-condition: we should have just created a new queue
+ Queuing0 = emqx_resource_metrics:queuing_get(?ID),
+ Inflight0 = emqx_resource_metrics:inflight_get(?ID),
+ ?assertEqual(0, Queuing0),
+ ?assertEqual(0, Inflight0),
+ ?check_trace(
+ begin
+ ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+ NumRequests = 10,
+ {ok, SRef} = snabbkaffe:subscribe(
+ ?match_event(#{?snk_kind := resource_worker_enter_blocked}),
+ NumBufferWorkers,
+ _Timeout = 5_000
+ ),
+ %% ensure replayq offloads to disk
+ Payload = binary:copy(<<"a">>, 119),
+ lists:foreach(
+ fun(N) ->
+ spawn_link(fun() ->
+ {error, _} =
+ emqx_resource:query(
+ ?ID,
+ {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
+ )
+ end)
+ end,
+ lists:seq(1, NumRequests)
+ ),
+
+ {ok, _} = snabbkaffe:receive_events(SRef),
+
+ %% ensure that stuff got enqueued into disk
+ tap_metrics(?LINE),
+ Queuing1 = emqx_resource_metrics:queuing_get(?ID),
+ Inflight1 = emqx_resource_metrics:inflight_get(?ID),
+ ?assert(Queuing1 > 0),
+ ?assertEqual(2, Inflight1),
+
+ %% now, we delete the resource
+ process_flag(trap_exit, true),
+ ok = emqx_resource:remove_local(?ID),
+ ?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
+
+ %% re-create the resource with the *same name*
+ {{ok, _}, {ok, _Events}} =
+ ?wait_async_action(
+ emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => async,
+ batch_size => 1,
+ worker_pool_size => 2,
+ queue_seg_bytes => 100,
+ resume_interval => 1_000
+ }
+ ),
+ #{?snk_kind := resource_worker_enter_running},
+ 5_000
+ ),
+
+ %% it shouldn't have anything enqueued, as it's a fresh resource
+ Queuing2 = emqx_resource_metrics:queuing_get(?ID),
+ Inflight2 = emqx_resource_metrics:queuing_get(?ID),
+ ?assertEqual(0, Queuing2),
+ ?assertEqual(0, Inflight2),
+
+ ok
+ end,
+ []
+ ),
+ ok.
+
+%% check that, if we configure a max queue size too small, then we
+%% never send requests and always overflow.
+t_always_overflow(_Config) ->
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => sync,
+ batch_size => 1,
+ worker_pool_size => 1,
+ max_queue_bytes => 1,
+ resume_interval => 1_000
+ }
+ ),
+ ?check_trace(
+ begin
+ Payload = binary:copy(<<"a">>, 100),
+ %% since it's sync and it should never send a request, this
+ %% errors with `timeout'.
+ ?assertError(
+ timeout,
+ emqx_resource:query(
+ ?ID,
+ {big_payload, Payload},
+ #{timeout => 500}
+ )
+ ),
+ ok
+ end,
+ fun(Trace) ->
+ ?assertEqual([], ?of_kind(call_query_enter, Trace)),
+ ok
+ end
+ ),
+ ok.
+
+t_retry_sync_inflight(_Config) ->
+ ResumeInterval = 1_000,
+ emqx_connector_demo:set_callback_mode(always_sync),
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => sync,
+ batch_size => 1,
+ worker_pool_size => 1,
+ resume_interval => ResumeInterval
+ }
+ ),
+ QueryOpts = #{},
+ ?check_trace(
+ begin
+ %% now really make the resource go into `blocked' state.
+ %% this results in a retriable error when sync.
+ ok = emqx_resource:simple_sync_query(?ID, block),
+ TestPid = self(),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ spawn_link(fun() ->
+ Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ TestPid ! {res, Res}
+ end),
+ #{?snk_kind := resource_worker_retry_inflight_failed},
+ ResumeInterval * 2
+ ),
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ ok = emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_retry_inflight_succeeded},
+ ResumeInterval * 3
+ ),
+ receive
+ {res, Res} ->
+ ?assertEqual(ok, Res)
+ after 5_000 ->
+ ct:fail("no response")
+ end,
+ ok
+ end,
+ [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
+ ),
+ ok.
+
+t_retry_sync_inflight_batch(_Config) ->
+ ResumeInterval = 1_000,
+ emqx_connector_demo:set_callback_mode(always_sync),
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => sync,
+ batch_size => 2,
+ batch_time => 200,
+ worker_pool_size => 1,
+ resume_interval => ResumeInterval
+ }
+ ),
+ QueryOpts = #{},
+ ?check_trace(
+ begin
+ %% make the resource go into `blocked' state. this
+ %% results in a retriable error when sync.
+ ok = emqx_resource:simple_sync_query(?ID, block),
+ process_flag(trap_exit, true),
+ TestPid = self(),
+ {_, {ok, _}} =
+ ?wait_async_action(
+ spawn_link(fun() ->
+ Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+ TestPid ! {res, Res}
+ end),
+ #{?snk_kind := resource_worker_retry_inflight_failed},
+ ResumeInterval * 2
+ ),
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ ok = emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_retry_inflight_succeeded},
+ ResumeInterval * 3
+ ),
+ receive
+ {res, Res} ->
+ ?assertEqual(ok, Res)
+ after 5_000 ->
+ ct:fail("no response")
+ end,
+ ok
+ end,
+ [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
+ ),
+ ok.
+
+t_retry_async_inflight(_Config) ->
+ ResumeInterval = 1_000,
+ emqx_connector_demo:set_callback_mode(async_if_possible),
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => async,
+ batch_size => 1,
+ worker_pool_size => 1,
+ resume_interval => ResumeInterval
+ }
+ ),
+ QueryOpts = #{},
+ ?check_trace(
+ begin
+ %% block
+ ok = emqx_resource:simple_sync_query(?ID, block),
+
+ %% then send an async request; that should be retriable.
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
+ #{?snk_kind := resource_worker_retry_inflight_failed},
+ ResumeInterval * 2
+ ),
+
+ %% will reply with success after the resource is healed
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_enter_running},
+ ResumeInterval * 2
+ ),
+ ok
+ end,
+ [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
+ ),
+ ok.
+
+t_retry_async_inflight_batch(_Config) ->
+ ResumeInterval = 1_000,
+ emqx_connector_demo:set_callback_mode(async_if_possible),
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => async,
+ batch_size => 2,
+ batch_time => 200,
+ worker_pool_size => 1,
+ resume_interval => ResumeInterval
+ }
+ ),
+ QueryOpts = #{},
+ ?check_trace(
+ begin
+ %% block
+ ok = emqx_resource:simple_sync_query(?ID, block),
+
+ %% then send an async request; that should be retriable.
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
+ #{?snk_kind := resource_worker_retry_inflight_failed},
+ ResumeInterval * 2
+ ),
+
+ %% will reply with success after the resource is healed
+ {ok, {ok, _}} =
+ ?wait_async_action(
+ emqx_resource:simple_sync_query(?ID, resume),
+ #{?snk_kind := resource_worker_enter_running},
+ ResumeInterval * 2
+ ),
+ ok
+ end,
+ [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
+ ),
+ ok.
+
+%% check that we monitor async worker pids and abort their inflight
+%% requests if they die.
+t_async_pool_worker_death(_Config) ->
+ ResumeInterval = 1_000,
+ NumBufferWorkers = 2,
+ emqx_connector_demo:set_callback_mode(async_if_possible),
+ {ok, _} = emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ #{
+ query_mode => async,
+ batch_size => 1,
+ worker_pool_size => NumBufferWorkers,
+ resume_interval => ResumeInterval
+ }
+ ),
+ Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+ Insert0 = fun(Tab, Ref, Result) ->
+ ct:pal("inserting ~p", [{Ref, Result}]),
+ ets:insert(Tab, {Ref, Result})
+ end,
+ ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
+ ?check_trace(
+ begin
+ ok = emqx_resource:simple_sync_query(?ID, block),
+
+ NumReqs = 10,
+ {ok, SRef0} =
+ snabbkaffe:subscribe(
+ ?match_event(#{?snk_kind := resource_worker_appended_to_inflight}),
+ NumReqs,
+ 1_000
+ ),
+ inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
+ {ok, _} = snabbkaffe:receive_events(SRef0),
+
+ Inflight0 = emqx_resource_metrics:inflight_get(?ID),
+ ?assertEqual(NumReqs, Inflight0),
+
+ %% grab one of the worker pids and kill it
+ {ok, SRef1} =
+ snabbkaffe:subscribe(
+ ?match_event(#{?snk_kind := resource_worker_worker_down_update}),
+ NumBufferWorkers,
+ 10_000
+ ),
+ {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
+ MRef = monitor(process, Pid0),
+ ct:pal("will kill ~p", [Pid0]),
+ exit(Pid0, kill),
+ receive
+ {'DOWN', MRef, process, Pid0, killed} ->
+ ct:pal("~p killed", [Pid0]),
+ ok
+ after 200 ->
+ ct:fail("worker should have died")
+ end,
+
+ %% inflight requests should have been marked as retriable
+ {ok, _} = snabbkaffe:receive_events(SRef1),
+ Inflight1 = emqx_resource_metrics:inflight_get(?ID),
+ ?assertEqual(NumReqs, Inflight1),
+
+ ok
+ end,
+ []
+ ),
+ ok.
+
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
@@ -1136,6 +1519,30 @@ inc_counter_in_parallel(N, Opts0) ->
end)
|| _ <- lists:seq(1, N)
],
+ [
+ receive
+ {complete, Pid} -> ok
+ after 1000 ->
+ ct:fail({wait_for_query_timeout, Pid})
+ end
+ || Pid <- Pids
+ ],
+ ok.
+
+inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
+ Parent = self(),
+ Pids = [
+ erlang:spawn(fun() ->
+ Opts =
+ case is_function(Opts0) of
+ true -> Opts0();
+ false -> Opts0
+ end,
+ emqx_resource:query(?ID, {inc_counter, M}, Opts),
+ Parent ! {complete, self()}
+ end)
+ || M <- lists:seq(StartN, StartN + N - 1)
+ ],
[
receive
{complete, Pid} -> ok
@@ -1156,3 +1563,43 @@ tap_metrics(Line) ->
{ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}.
+
+assert_sync_retry_fail_then_succeed_inflight(Trace) ->
+ ct:pal(" ~p", [Trace]),
+ ?assert(
+ ?strict_causality(
+ #{?snk_kind := resource_worker_flush_nack, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ Trace
+ )
+ ),
+ %% not strict causality because it might retry more than once
+ %% before restoring the resource health.
+ ?assert(
+ ?causality(
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
+ Trace
+ )
+ ),
+ ok.
+
+assert_async_retry_fail_then_succeed_inflight(Trace) ->
+ ct:pal(" ~p", [Trace]),
+ ?assert(
+ ?strict_causality(
+ #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ Trace
+ )
+ ),
+ %% not strict causality because it might retry more than once
+ %% before restoring the resource health.
+ ?assert(
+ ?causality(
+ #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+ #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
+ Trace
+ )
+ ),
+ ok.
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
index f0591a2e6..6d96e3883 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
@@ -92,7 +92,6 @@ values(common, Protocol, SupportUint, TypeOpts) ->
"bool=${payload.bool}">>,
precision => ms,
resource_opts => #{
- enable_batch => false,
batch_size => 100,
batch_time => <<"20ms">>
},
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
index 9977e70a7..47a4646de 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
@@ -108,6 +108,7 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config0) when
TestCase =:= t_publish_success_batch
->
+ ct:timetrap({seconds, 30}),
case ?config(batch_size, Config0) of
1 ->
[{skip_due_to_no_batching, true}];
@@ -120,6 +121,7 @@ init_per_testcase(TestCase, Config0) when
[{telemetry_table, Tid} | Config]
end;
init_per_testcase(TestCase, Config0) ->
+ ct:timetrap({seconds, 30}),
{ok, _} = start_echo_http_server(),
delete_all_bridges(),
Tid = install_telemetry_handler(TestCase),
@@ -287,6 +289,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n"
" pipelining = ~b\n"
" resource_opts = {\n"
+ " request_timeout = 500ms\n"
" worker_pool_size = 1\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
@@ -512,14 +515,16 @@ install_telemetry_handler(TestCase) ->
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
Events = receive_all_events(GaugeName, Timeout),
- case lists:last(Events) of
+ case length(Events) > 0 andalso lists:last(Events) of
#{measurements := #{gauge_set := ExpectedValue}} ->
ok;
#{measurements := #{gauge_set := Value}} ->
ct:fail(
"gauge ~p didn't reach expected value ~p; last value: ~p",
[GaugeName, ExpectedValue, Value]
- )
+ );
+ false ->
+ ct:pal("no ~p gauge events received!", [GaugeName])
end.
receive_all_events(EventName, Timeout) ->
@@ -609,6 +614,8 @@ t_publish_success(Config) ->
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
+ wait_until_gauge_is(queuing, 0, 500),
+ wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
#{
dropped => 0,
@@ -657,6 +664,8 @@ t_publish_success_local_topic(Config) ->
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
+ wait_until_gauge_is(queuing, 0, 500),
+ wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
#{
dropped => 0,
@@ -743,6 +752,8 @@ t_publish_templated(Config) ->
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
+ wait_until_gauge_is(queuing, 0, 500),
+ wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
#{
dropped => 0,
@@ -1124,19 +1135,17 @@ do_econnrefused_or_timeout_test(Config, Error) ->
ResourceId
);
{_, sync} ->
- wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
- timeout => 10_000, n_events => 2
- }),
%% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization.
- ct:sleep(200),
+ wait_until_gauge_is(queuing, 0, 500),
+ wait_until_gauge_is(inflight, 1, 500),
assert_metrics(
#{
dropped => 0,
failed => 0,
- inflight => 0,
+ inflight => 1,
matched => 1,
- queuing => 1,
+ queuing => 0,
retried => 0,
success => 0
},
@@ -1264,7 +1273,6 @@ t_failure_no_body(Config) ->
t_unrecoverable_error(Config) ->
ResourceId = ?config(resource_id, Config),
- TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
TestPid = self(),
FailureNoBodyHandler =
@@ -1326,26 +1334,14 @@ t_unrecoverable_error(Config) ->
ok
end
),
- wait_telemetry_event(TelemetryTable, failed, ResourceId),
- ExpectedInflightEvents =
- case QueryMode of
- sync -> 1;
- async -> 3
- end,
- wait_telemetry_event(
- TelemetryTable,
- inflight,
- ResourceId,
- #{n_events => ExpectedInflightEvents, timeout => 5_000}
- ),
- %% even waiting, hard to avoid flakiness... simpler to just sleep
- %% a bit until stabilization.
- ct:sleep(200),
+
+ wait_until_gauge_is(queuing, 0, _Timeout = 400),
+ wait_until_gauge_is(inflight, 1, _Timeout = 400),
assert_metrics(
#{
dropped => 0,
- failed => 1,
- inflight => 0,
+ failed => 0,
+ inflight => 1,
matched => 1,
queuing => 0,
retried => 0,
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl
index bb87a9f37..0372c21ea 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl
@@ -778,15 +778,25 @@ t_bad_timestamp(Config) ->
{async, false} ->
?assertEqual(ok, Return),
?assertMatch(
- [#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}],
+ [
+ #{
+ error := [
+ {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+ ]
+ }
+ ],
?of_kind(influxdb_connector_send_query_error, Trace)
);
{sync, false} ->
?assertEqual(
- {error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return
+ {error,
+ {unrecoverable_error, [
+ {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+ ]}},
+ Return
);
{sync, true} ->
- ?assertEqual({error, points_trans_failed}, Return)
+ ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return)
end,
ok
end
@@ -894,11 +904,19 @@ t_write_failure(Config) ->
},
?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- send_message(Config, SentData)
- end),
- fun(Result, _Trace) ->
case QueryMode of
sync ->
+ ?assertError(timeout, send_message(Config, SentData));
+ async ->
+ ?assertEqual(ok, send_message(Config, SentData))
+ end
+ end),
+ fun(Trace0) ->
+ case QueryMode of
+ sync ->
+ Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+ ?assertMatch([_ | _], Trace),
+ [#{result := Result} | _] = Trace,
?assert(
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse
{error, {error, closed}} =:= Result orelse
@@ -906,7 +924,7 @@ t_write_failure(Config) ->
#{got => Result}
);
async ->
- ?assertEqual(ok, Result)
+ ok
end,
ok
end
@@ -938,11 +956,7 @@ t_missing_field(Config) ->
begin
emqx:publish(Msg0),
emqx:publish(Msg1),
- NEvents =
- case IsBatch of
- true -> 1;
- false -> 2
- end,
+ NEvents = 1,
{ok, _} =
snabbkaffe:block_until(
?match_n_events(NEvents, #{
@@ -964,7 +978,7 @@ t_missing_field(Config) ->
);
false ->
?assertMatch(
- [#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _],
+ [#{error := [{error, no_fields}]} | _],
?of_kind(influxdb_connector_send_query_error, Trace)
)
end,
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
index 812c4ee85..ce38c357d 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_mysql_SUITE).
@@ -170,6 +170,7 @@ mysql_config(BridgeType, Config) ->
" password = ~p\n"
" sql = ~p\n"
" resource_opts = {\n"
+ " request_timeout = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@@ -397,20 +398,32 @@ t_write_failure(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
+ QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- send_message(Config, SentData)
+ case QueryMode of
+ sync ->
+ ?assertError(timeout, send_message(Config, SentData));
+ async ->
+ send_message(Config, SentData)
+ end
end),
- fun
- ({error, {resource_error, _}}, _Trace) ->
- ok;
- ({error, {recoverable_error, disconnected}}, _Trace) ->
- ok;
- (_, _Trace) ->
- ?assert(false)
+ fun(Trace0) ->
+ ct:pal("trace: ~p", [Trace0]),
+ Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+ ?assertMatch([#{result := {error, _}} | _], Trace),
+ [#{result := {error, Error}} | _] = Trace,
+ case Error of
+ {resource_error, _} ->
+ ok;
+ {recoverable_error, disconnected} ->
+ ok;
+ _ ->
+ ct:fail("unexpected error: ~p", [Error])
+ end
end
),
ok.
@@ -424,10 +437,10 @@ t_write_timeout(Config) ->
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
- Timeout = 10,
+ Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
- ?assertMatch(
- {error, {resource_error, _}},
+ ?assertError(
+ timeout,
query_resource(Config, {send_message, SentData, [], Timeout})
)
end),
@@ -443,7 +456,7 @@ t_simple_sql_query(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
case IsBatch of
- true -> ?assertEqual({error, batch_select_not_implemented}, Result);
+ true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
end,
ok.
@@ -459,10 +472,16 @@ t_missing_data(Config) ->
case IsBatch of
true ->
?assertMatch(
- {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
+ {error,
+ {unrecoverable_error,
+ {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}},
+ Result
);
false ->
- ?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
+ ?assertMatch(
+ {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}},
+ Result
+ )
end,
ok.
@@ -476,8 +495,10 @@ t_bad_sql_parameter(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
case IsBatch of
- true -> ?assertEqual({error, invalid_request}, Result);
- false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
+ true ->
+ ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
+ false ->
+ ?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result)
end,
ok.
@@ -491,8 +512,8 @@ t_unprepared_statement_query(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
case IsBatch of
- true -> ?assertEqual({error, invalid_request}, Result);
- false -> ?assertEqual({error, prepared_statement_invalid}, Result)
+ true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
+ false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result)
end,
ok.
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl
index c2ff6fa8f..f39ecc1dc 100644
--- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl
@@ -191,6 +191,7 @@ pgsql_config(BridgeType, Config) ->
" password = ~p\n"
" sql = ~p\n"
" resource_opts = {\n"
+ " request_timeout = 500ms\n"
" batch_size = ~b\n"
" query_mode = ~s\n"
" }\n"
@@ -415,20 +416,32 @@ t_write_failure(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
+ QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
- send_message(Config, SentData)
+ case QueryMode of
+ sync ->
+ ?assertError(timeout, send_message(Config, SentData));
+ async ->
+ send_message(Config, SentData)
+ end
end),
- fun
- ({error, {resource_error, _}}, _Trace) ->
- ok;
- ({error, {recoverable_error, disconnected}}, _Trace) ->
- ok;
- (_, _Trace) ->
- ?assert(false)
+ fun(Trace0) ->
+ ct:pal("trace: ~p", [Trace0]),
+ Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+ ?assertMatch([#{result := {error, _}} | _], Trace),
+ [#{result := {error, Error}} | _] = Trace,
+ case Error of
+ {resource_error, _} ->
+ ok;
+ disconnected ->
+ ok;
+ _ ->
+ ct:fail("unexpected error: ~p", [Error])
+ end
end
),
ok.
@@ -442,12 +455,9 @@ t_write_timeout(Config) ->
{ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000},
- Timeout = 10,
+ Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
- ?assertMatch(
- {error, {resource_error, _}},
- query_resource(Config, {send_message, SentData, [], Timeout})
- )
+ ?assertError(timeout, query_resource(Config, {send_message, SentData, [], Timeout}))
end),
ok.
@@ -459,7 +469,7 @@ t_simple_sql_query(Config) ->
Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
- true -> ?assertEqual({error, batch_prepare_not_implemented}, Result);
+ true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false -> ?assertMatch({ok, _, [{1}]}, Result)
end,
ok.
@@ -471,7 +481,8 @@ t_missing_data(Config) ->
),
Result = send_message(Config, #{}),
?assertMatch(
- {error, {error, error, <<"23502">>, not_null_violation, _, _}}, Result
+ {error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}},
+ Result
),
ok.
@@ -484,10 +495,10 @@ t_bad_sql_parameter(Config) ->
Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of
true ->
- ?assertEqual({error, invalid_request}, Result);
+ ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertMatch(
- {error, {resource_error, _}}, Result
+ {error, {unrecoverable_error, _}}, Result
)
end,
ok.
diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config
index 262641d44..3af1868c7 100644
--- a/lib-ee/emqx_ee_connector/rebar.config
+++ b/lib-ee/emqx_ee_connector/rebar.config
@@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.
{deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
- {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}},
+ {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.7"}}},
{emqx, {path, "../../apps/emqx"}}
]}.
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
index 37d193edf..041bdec08 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
@@ -178,7 +178,7 @@ on_query(BridgeId, {send_message, Selected}, State) ->
{send_message, map()},
{ReplyFun :: function(), Args :: list()},
state()
-) -> ok.
+) -> {ok, pid()}.
on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}],
?TRACE(
@@ -210,7 +210,7 @@ on_batch_query(BridgeId, Requests, State) ->
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
state()
-) -> ok.
+) -> {ok, pid()}.
on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"QUERY_ASYNC",
@@ -496,7 +496,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
resource_id()
-) -> ok.
+) -> {ok, pid()}.
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{
pool_name := PoolName,
@@ -531,7 +531,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
Request,
RequestTimeout,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
- ).
+ ),
+ {ok, Worker}.
-spec reply_delegator(
resource_id(),
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl
index 553e5369f..0d21b381e 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl
@@ -56,13 +56,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
#{points => Points, batch => false, mode => sync}
),
do_query(InstId, Client, Points);
- {error, ErrorPoints} = Err ->
+ {error, ErrorPoints} ->
?tp(
influxdb_connector_send_query_error,
#{batch => false, mode => sync, error => ErrorPoints}
),
log_error_points(InstId, ErrorPoints),
- Err
+ {error, {unrecoverable_error, ErrorPoints}}
end.
%% Once a Batched Data trans to points failed.
@@ -80,7 +80,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
influxdb_connector_send_query_error,
#{batch => true, mode => sync, error => Reason}
),
- {error, Reason}
+ {error, {unrecoverable_error, Reason}}
end.
on_query_async(
@@ -123,7 +123,7 @@ on_batch_query_async(
influxdb_connector_send_query_error,
#{batch => true, mode => async, error => Reason}
),
- {error, Reason}
+ {error, {unrecoverable_error, Reason}}
end.
on_get_status(_InstId, #{client := Client}) ->
@@ -356,7 +356,7 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
connector => InstId,
points => Points
}),
- ok = influxdb:write_async(Client, Points, ReplyFunAndArgs).
+ {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs).
%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans
diff --git a/mix.exs b/mix.exs
index e124116c8..7deab2625 100644
--- a/mix.exs
+++ b/mix.exs
@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
- {:replayq, github: "emqx/replayq", tag: "0.3.5", override: true},
+ {:replayq, github: "emqx/replayq", tag: "0.3.6", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
{:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
@@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
- {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true},
+ {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
diff --git a/rebar.config b/rebar.config
index bfdc3e4ee..c2fc678dd 100644
--- a/rebar.config
+++ b/rebar.config
@@ -60,7 +60,7 @@
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
- , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.5"}}}
+ , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}