fix(rule tracing): make sure that recoverable errors are traced

This commit is contained in:
Kjell Winblad 2024-05-07 15:44:54 +02:00
parent feecc36607
commit 09ee7ec0e2
4 changed files with 234 additions and 83 deletions

View File

@ -88,6 +88,21 @@ prepare_key_value(packet = K, V, PEncode) ->
V V
end, end,
{K, NewV}; {K, NewV};
prepare_key_value(K, {recoverable_error, Msg} = OrgV, PEncode) ->
try
prepare_key_value(
K,
#{
error_type => recoverable_error,
msg => Msg,
additional_info => <<"The operation may be retried.">>
},
PEncode
)
catch
_:_ ->
{K, OrgV}
end;
prepare_key_value(rule_ids = K, V, _PEncode) -> prepare_key_value(rule_ids = K, V, _PEncode) ->
NewV = NewV =
try try

View File

@ -68,7 +68,7 @@
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
). ).
-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)). -define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)).
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(REPLY(FROM, SENT, RESULT, TRACE_CTX), {reply, FROM, SENT, RESULT, TRACE_CTX}).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
). ).
@ -448,8 +448,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
{ShouldAck, PostFn, DeltaCounters} = {ShouldAck, PostFn, DeltaCounters} =
case QueryOrBatch of case QueryOrBatch of
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) -> ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) ->
Reply = ?REPLY(ReplyTo, HasBeenSent, Result), Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx),
reply_caller_defer_metrics(Id, Reply, QueryOpts); reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _, _, _) | _] = Batch -> [?QUERY(_, _, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
@ -662,10 +662,10 @@ do_flush(
inflight_tid := InflightTID inflight_tid := InflightTID
} = Data0, } = Data0,
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch, [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(ReplyTo, HasBeenSent, Result), Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx),
{ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts), {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts),
Data1 = aggregate_counters(Data0, DeltaCounters), Data1 = aggregate_counters(Data0, DeltaCounters),
case ShouldAck of case ShouldAck of
@ -856,15 +856,15 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) -> expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
lists:map( lists:map(
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) -> fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx), Result}) ->
?REPLY(FROM, SENT, Result) ?REPLY(FROM, SENT, Result, TraceCtx)
end, end,
lists:zip(Batch, BatchResults) lists:zip(Batch, BatchResults)
); );
expand_batch_reply(BatchResult, Batch) -> expand_batch_reply(BatchResult, Batch) ->
lists:map( lists:map(
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) -> fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx)) ->
?REPLY(FROM, SENT, BatchResult) ?REPLY(FROM, SENT, BatchResult, TraceCtx)
end, end,
Batch Batch
). ).
@ -876,12 +876,14 @@ reply_caller(Id, Reply, QueryOpts) ->
%% Should only reply to the caller when the decision is final (not %% Should only reply to the caller when the decision is final (not
%% retriable). See comment on `handle_query_result_pure'. %% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result, TraceCtx), _QueryOpts) ->
handle_query_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(Id, Result, HasBeenSent, TraceCtx);
reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) -> reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx), QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result), IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(
Id, Result, HasBeenSent, TraceCtx
),
case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{ack, {async_return, _}, true, _} -> {ack, {async_return, _}, true, _} ->
ok = do_reply_caller(ReplyTo, Result); ok = do_reply_caller(ReplyTo, Result);
@ -921,7 +923,7 @@ batch_reply_dropped(Batch, Result) ->
%% This is only called by `simple_{,a}sync_query', so we can bump the %% This is only called by `simple_{,a}sync_query', so we can bump the
%% counters here. %% counters here.
handle_query_result(Id, Result, HasBeenSent) -> handle_query_result(Id, Result, HasBeenSent) ->
{ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent, #{}),
PostFn(), PostFn(),
bump_counters(Id, DeltaCounters), bump_counters(Id, DeltaCounters),
ShouldBlock. ShouldBlock.
@ -932,37 +934,49 @@ handle_query_result(Id, Result, HasBeenSent) ->
%% * the result is a success (or at least a delayed result) %% * the result is a success (or at least a delayed result)
%% We also retry even sync requests. In that case, we shouldn't reply %% We also retry even sync requests. In that case, we shouldn't reply
%% the caller until one of those final results above happen. %% the caller until one of those final results above happen.
-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) -> -spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean(), TraceCTX :: map()) ->
{ack | nack, function(), counters()}. {ack | nack, function(), counters()}.
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent, TraceCTX) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}), ?TRACE(
error,
"ERROR",
"resource_exception",
(trace_ctx_map(TraceCTX))#{info => emqx_utils:redact(Msg)}
),
ok ok
end, end,
{nack, PostFn, #{}}; {nack, PostFn, #{}};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _TraceCTX) when
NotWorking == not_connected; NotWorking == blocked NotWorking == not_connected; NotWorking == blocked
-> ->
{nack, fun() -> ok end, #{}}; {nack, fun() -> ok end, #{}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, TraceCTX) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}), ?TRACE(
error,
"ERROR",
"resource_not_found",
(trace_ctx_map(TraceCTX))#{id => Id, info => Msg}
),
ok ok
end, end,
{ack, PostFn, #{dropped_resource_not_found => 1}}; {ack, PostFn, #{dropped_resource_not_found => 1}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, TraceCTX) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}), ?TRACE(error, "ERROR", "resource_stopped", (trace_ctx_map(TraceCTX))#{id => Id, info => Msg}),
ok ok
end, end,
{ack, PostFn, #{dropped_resource_stopped => 1}}; {ack, PostFn, #{dropped_resource_stopped => 1}};
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, TraceCTX) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}), ?TRACE(error, "ERROR", "other_resource_error", (trace_ctx_map(TraceCTX))#{
id => Id, reason => Reason
}),
ok ok
end, end,
{nack, PostFn, #{}}; {nack, PostFn, #{}};
handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
case is_unrecoverable_error(Error) of case is_unrecoverable_error(Error) of
true -> true ->
PostFn = PostFn =
@ -979,14 +993,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
false -> false ->
PostFn = PostFn =
fun() -> fun() ->
?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}), ?TRACE(error, "ERROR", "send_error", (trace_ctx_map(TraceCTX))#{
id => Id, reason => Reason
}),
ok ok
end, end,
{nack, PostFn, #{}} {nack, PostFn, #{}}
end; end;
handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> handle_query_result_pure(Id, {async_return, Result}, HasBeenSent, TraceCTX) ->
handle_query_async_result_pure(Id, Result, HasBeenSent); handle_query_async_result_pure(Id, Result, HasBeenSent, TraceCTX);
handle_query_result_pure(_Id, Result, HasBeenSent) -> handle_query_result_pure(_Id, Result, HasBeenSent, _TraceCTX) ->
PostFn = fun() -> PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
ok ok
@ -998,9 +1014,9 @@ handle_query_result_pure(_Id, Result, HasBeenSent) ->
end, end,
{ack, PostFn, Counters}. {ack, PostFn, Counters}.
-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) -> -spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean(), map()) ->
{ack | nack, function(), counters()}. {ack | nack, function(), counters()}.
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
case is_unrecoverable_error(Error) of case is_unrecoverable_error(Error) of
true -> true ->
PostFn = PostFn =
@ -1016,16 +1032,18 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
{ack, PostFn, Counters}; {ack, PostFn, Counters};
false -> false ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}), ?TRACE(error, "ERROR", "async_send_error", (trace_ctx_map(TraceCTX))#{
id => Id, reason => Reason
}),
ok ok
end, end,
{nack, PostFn, #{}} {nack, PostFn, #{}}
end; end;
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent, _TraceCTX) when is_pid(Pid) ->
{ack, fun() -> ok end, #{}}; {ack, fun() -> ok end, #{}};
handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> handle_query_async_result_pure(_Id, ok, _HasBeenSent, _TraceCTX) ->
{ack, fun() -> ok end, #{}}; {ack, fun() -> ok end, #{}};
handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -> handle_query_async_result_pure(Id, Results, HasBeenSent, TraceCTX) when is_list(Results) ->
All = fun(L) -> All = fun(L) ->
case L of case L of
{ok, Pid} -> is_pid(Pid); {ok, Pid} -> is_pid(Pid);
@ -1037,17 +1055,26 @@ handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -
{ack, fun() -> ok end, #{}}; {ack, fun() -> ok end, #{}};
false -> false ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{ ?TRACE(
id => Id, error,
msg => "async_batch_send_error", "ERROR",
reason => Results, "async_batch_send_error",
has_been_sent => HasBeenSent (trace_ctx_map(TraceCTX))#{
}), id => Id,
reason => Results,
has_been_sent => HasBeenSent
}
),
ok ok
end, end,
{nack, PostFn, #{}} {nack, PostFn, #{}}
end. end.
trace_ctx_map(undefined) ->
#{};
trace_ctx_map(Map) ->
Map.
-spec aggregate_counters(data(), counters()) -> data(). -spec aggregate_counters(data(), counters()) -> data().
aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) -> aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->
Counters = merge_counters(OldCounters, DeltaCounters), Counters = merge_counters(OldCounters, DeltaCounters),
@ -1526,7 +1553,7 @@ do_handle_async_reply(
request_ref := Ref, request_ref := Ref,
buffer_worker := BufferWorkerPid, buffer_worker := BufferWorkerPid,
inflight_tid := InflightTID, inflight_tid := InflightTID,
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, TraceCtx) = _Query
}, },
Result Result
) -> ) ->
@ -1534,7 +1561,7 @@ do_handle_async_reply(
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
{Action, PostFn, DeltaCounters} = reply_caller_defer_metrics( {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics(
Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts Id, ?REPLY(ReplyTo, Sent, Result, TraceCtx), QueryOpts
), ),
?tp(handle_async_reply, #{ ?tp(handle_async_reply, #{

View File

@ -399,7 +399,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
t_apply_rule_test_format_action_failed(_Config) -> t_apply_rule_test_format_action_failed(_Config) ->
MeckOpts = [passthrough, no_link, no_history, non_strict], MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_info, MeckOpts), catch meck:new(emqx_rule_engine_test_connector, MeckOpts),
meck:expect( meck:expect(
emqx_rule_engine_test_connector, emqx_rule_engine_test_connector,
on_query, on_query,
@ -408,8 +408,8 @@ t_apply_rule_test_format_action_failed(_Config) ->
), ),
CheckFun = CheckFun =
fun(Bin0) -> fun(Bin0) ->
%% The last line in the Bin should be the action_failed entry
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
%% The last line in the Bin should be the action_success entry
Bin1 = string:trim(Bin0), Bin1 = string:trim(Bin0),
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
@ -437,50 +437,141 @@ t_apply_rule_test_format_action_failed(_Config) ->
LastEntryJSON LastEntryJSON
) )
end, end,
do_apply_rule_test_format_action_failed_test(CheckFun). do_apply_rule_test_format_action_failed_test(1, CheckFun).
t_apply_rule_test_format_action_out_of_service(_Config) -> t_apply_rule_test_format_action_out_of_service_query(_Config) ->
Reason = <<"MY_RECOVERABLE_REASON">>,
CheckFun = out_of_service_check_fun(<<"send_error">>, Reason),
meck_test_connector_recoverable_errors(Reason),
do_apply_rule_test_format_action_failed_test(1, CheckFun).
t_apply_rule_test_format_action_out_of_service_batch_query(_Config) ->
Reason = <<"MY_RECOVERABLE_REASON">>,
CheckFun = out_of_service_check_fun(<<"send_error">>, Reason),
meck_test_connector_recoverable_errors(Reason),
do_apply_rule_test_format_action_failed_test(10, CheckFun).
t_apply_rule_test_format_action_out_of_service_async_query(_Config) ->
Reason = <<"MY_RECOVERABLE_REASON">>,
CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason),
meck_test_connector_recoverable_errors(Reason),
meck:expect(
emqx_rule_engine_test_connector,
callback_mode,
0,
async_if_possible
),
do_apply_rule_test_format_action_failed_test(1, CheckFun).
t_apply_rule_test_format_action_out_of_service_async_batch_query(_Config) ->
Reason = <<"MY_RECOVERABLE_REASON">>,
CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason),
meck_test_connector_recoverable_errors(Reason),
meck:expect(
emqx_rule_engine_test_connector,
callback_mode,
0,
async_if_possible
),
do_apply_rule_test_format_action_failed_test(10, CheckFun).
out_of_service_check_fun(SendErrorMsg, Reason) ->
fun(Bin0) ->
%% The last line in the Bin should be the action_failed entry
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
io:format("LOG:\n~s", [Bin0]),
Bin1 = string:trim(Bin0),
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
?assertMatch(
#{
<<"level">> := <<"debug">>,
<<"meta">> :=
#{
<<"action_info">> :=
#{
<<"name">> := _,
<<"type">> := <<"rule_engine_test">>
},
<<"clientid">> := _,
<<"reason">> := <<"request_expired">>,
<<"rule_id">> := _,
<<"rule_trigger_time">> := _,
<<"stop_action_after_render">> := false,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
<<"time">> := _
},
LastEntryJSON
),
%% We should have at least one entry containing Reason
[ReasonLine | _] = find_lines_with(Bin1, Reason),
ReasonEntryJSON = emqx_utils_json:decode(ReasonLine, [return_maps]),
?assertMatch(
#{
<<"level">> := <<"debug">>,
<<"meta">> :=
#{
<<"client_ids">> := [],
<<"clientid">> := _,
<<"id">> := _,
<<"reason">> :=
#{
<<"additional_info">> := _,
<<"error_type">> := <<"recoverable_error">>,
<<"msg">> := <<"MY_RECOVERABLE_REASON">>
},
<<"rule_id">> := _,
<<"rule_ids">> := [],
<<"rule_trigger_time">> := _,
<<"rule_trigger_times">> := [],
<<"stop_action_after_render">> := false,
<<"trace_tag">> := <<"ERROR">>
},
<<"msg">> := SendErrorMsg,
<<"time">> := _
},
ReasonEntryJSON
)
end.
meck_test_connector_recoverable_errors(Reason) ->
MeckOpts = [passthrough, no_link, no_history, non_strict], MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_info, MeckOpts), catch meck:new(emqx_rule_engine_test_connector, MeckOpts),
meck:expect( meck:expect(
emqx_rule_engine_test_connector, emqx_rule_engine_test_connector,
on_query, on_query,
3, 3,
{error, {recoverable_error, <<"MY RECOVERABLE REASON">>}} {error, {recoverable_error, Reason}}
), ),
CheckFun = meck:expect(
fun(Bin0) -> emqx_rule_engine_test_connector,
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), on_batch_query,
%% The last line in the Bin should be the action_success entry 3,
Bin1 = string:trim(Bin0), {error, {recoverable_error, Reason}}
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), ),
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), meck:expect(
?assertMatch( emqx_rule_engine_test_connector,
#{ on_query_async,
<<"level">> := <<"debug">>, 4,
<<"meta">> := {error, {recoverable_error, Reason}}
#{ ),
<<"action_info">> := meck:expect(
#{ emqx_rule_engine_test_connector,
<<"name">> := _, on_batch_query_async,
<<"type">> := <<"rule_engine_test">> 4,
}, {error, {recoverable_error, Reason}}
<<"clientid">> := _, ).
<<"reason">> := <<"request_expired">>,
<<"rule_id">> := _,
<<"rule_trigger_time">> := _,
<<"stop_action_after_render">> := false,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
<<"time">> := _
},
LastEntryJSON
)
end,
do_apply_rule_test_format_action_failed_test(CheckFun).
do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) -> find_lines_with(Data, InLineText) ->
% Split the binary data into lines
Lines = re:split(Data, "\n", [{return, binary}]),
% Use a list comprehension to filter lines containing 'Reason'
[Line || Line <- Lines, re:run(Line, InLineText, [multiline, {capture, none}]) =/= nomatch].
do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) ->
meck_in_test_connector(), meck_in_test_connector(),
{ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
@ -489,8 +580,8 @@ do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) ->
<<"connector">> => Name, <<"connector">> => Name,
<<"parameters">> => #{<<"values">> => #{}}, <<"parameters">> => #{<<"values">> => #{}},
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"batch_size">> => 1, <<"batch_size">> => BatchSize,
<<"batch_time">> => 0, <<"batch_time">> => 10,
<<"request_ttl">> => 200 <<"request_ttl">> => 200
} }
}, },

View File

@ -29,7 +29,9 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_query_async/4,
on_batch_query/3, on_batch_query/3,
on_batch_query_async/4,
on_get_status/2, on_get_status/2,
on_add_channel/4, on_add_channel/4,
on_remove_channel/3, on_remove_channel/3,
@ -85,6 +87,14 @@ on_query(
) -> ) ->
ok. ok.
on_query_async(
_InstId,
_Query,
_State,
_Callback
) ->
ok.
on_batch_query( on_batch_query(
_InstId, _InstId,
[{ChannelId, _Req} | _] = Msg, [{ChannelId, _Req} | _] = Msg,
@ -96,5 +106,13 @@ on_batch_query(
emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}), emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}),
ok. ok.
on_batch_query_async(
_InstId,
_Batch,
_State,
_Callback
) ->
ok.
on_get_status(_InstId, _State) -> on_get_status(_InstId, _State) ->
connected. connected.