Merge pull request #12981 from kjellwinblad/kjell/fixup_trace_all_http_and_error
Make formatting of action result trace entries better
This commit is contained in:
commit
72f68afbca
|
@ -88,6 +88,21 @@ prepare_key_value(packet = K, V, PEncode) ->
|
|||
V
|
||||
end,
|
||||
{K, NewV};
|
||||
prepare_key_value(K, {recoverable_error, Msg} = OrgV, PEncode) ->
|
||||
try
|
||||
prepare_key_value(
|
||||
K,
|
||||
#{
|
||||
error_type => recoverable_error,
|
||||
msg => Msg,
|
||||
additional_info => <<"The operation may be retried.">>
|
||||
},
|
||||
PEncode
|
||||
)
|
||||
catch
|
||||
_:_ ->
|
||||
{K, OrgV}
|
||||
end;
|
||||
prepare_key_value(rule_ids = K, V, _PEncode) ->
|
||||
NewV =
|
||||
try
|
||||
|
|
|
@ -29,7 +29,8 @@
|
|||
on_query_async/4,
|
||||
on_batch_query/3,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% callbacks of ecpool
|
||||
|
@ -459,6 +460,11 @@ handle_result({error, Error}) ->
|
|||
handle_result(Res) ->
|
||||
Res.
|
||||
|
||||
on_format_query_result({ok, Result}) ->
|
||||
#{result => ok, info => Result};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% utils
|
||||
|
||||
|
|
|
@ -38,7 +38,8 @@
|
|||
on_get_channels/1,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% callbacks for ecpool
|
||||
|
@ -519,6 +520,13 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
|
|||
to_error_tuple(ClickhouseErrorResult)
|
||||
end.
|
||||
|
||||
on_format_query_result(ok) ->
|
||||
#{result => ok, message => <<"">>};
|
||||
on_format_query_result({ok, Message}) ->
|
||||
#{result => ok, message => Message};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
to_recoverable_error({error, Reason}) ->
|
||||
{error, {recoverable_error, Reason}};
|
||||
to_recoverable_error(Error) ->
|
||||
|
|
|
@ -26,7 +26,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -184,6 +185,11 @@ on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) ->
|
|||
on_batch_query(_InstanceId, Query, _State) ->
|
||||
{error, {unrecoverable_error, {invalid_request, Query}}}.
|
||||
|
||||
on_format_query_result({ok, Result}) ->
|
||||
#{result => ok, info => Result};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
health_check_timeout() ->
|
||||
2500.
|
||||
|
||||
|
|
|
@ -23,7 +23,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -288,6 +289,9 @@ on_query_async(
|
|||
InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
|
||||
).
|
||||
|
||||
on_format_query_result(Result) ->
|
||||
emqx_bridge_http_connector:on_format_query_result(Result).
|
||||
|
||||
on_add_channel(
|
||||
InstanceId,
|
||||
#{channels := Channels} = State0,
|
||||
|
|
|
@ -53,7 +53,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([reply_delegator/2]).
|
||||
|
@ -489,6 +490,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
|
|||
handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
|
||||
Result.
|
||||
|
||||
on_format_query_result({ok, Info}) ->
|
||||
#{result => ok, info => Info};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
reply_delegator(ReplyFunAndArgs, Response) ->
|
||||
case Response of
|
||||
{error, Reason} when
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
on_batch_query/3,
|
||||
on_query_async/4,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
-export([reply_callback/2]).
|
||||
|
||||
|
@ -453,6 +454,11 @@ do_query(InstId, Channel, Client, Points) ->
|
|||
end
|
||||
end.
|
||||
|
||||
on_format_query_result({ok, {affected_rows, Rows}}) ->
|
||||
#{result => ok, affected_rows => Rows};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
|
||||
?SLOG(info, #{
|
||||
msg => "greptimedb_write_point_async",
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
on_batch_query/3,
|
||||
on_query_async/4,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
-export([reply_callback/2]).
|
||||
|
||||
|
@ -209,6 +210,9 @@ on_batch_query_async(
|
|||
{error, {unrecoverable_error, Reason}}
|
||||
end.
|
||||
|
||||
on_format_query_result(Result) ->
|
||||
emqx_bridge_http_connector:on_format_query_result(Result).
|
||||
|
||||
on_get_status(_InstId, #{client := Client}) ->
|
||||
case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
|
||||
true ->
|
||||
|
|
|
@ -26,7 +26,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -390,6 +391,9 @@ on_batch_query(
|
|||
Error
|
||||
end.
|
||||
|
||||
on_format_query_result(Result) ->
|
||||
emqx_bridge_http_connector:on_format_query_result(Result).
|
||||
|
||||
on_add_channel(
|
||||
InstanceId,
|
||||
#{iotdb_version := Version, channels := Channels} = OldState0,
|
||||
|
|
|
@ -39,7 +39,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
|
@ -318,6 +319,11 @@ handle_result({error, Reason} = Error, Requests, InstanceId) ->
|
|||
}),
|
||||
Error.
|
||||
|
||||
on_format_query_result({ok, Result}) ->
|
||||
#{result => ok, info => Result};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
parse_template(Config) ->
|
||||
#{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
|
||||
Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
on_get_status/2,
|
||||
on_query/3,
|
||||
on_start/2,
|
||||
on_stop/2
|
||||
on_stop/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%%========================================================================================
|
||||
|
@ -85,6 +86,11 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat
|
|||
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
||||
emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
|
||||
|
||||
on_format_query_result({{Result, Info}, Documents}) ->
|
||||
#{result => Result, info => Info, documents => Documents};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
|
||||
NewState = State#{channels => maps:remove(ChannelId, Channels)},
|
||||
{ok, NewState}.
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([connector_examples/1]).
|
||||
|
@ -175,6 +176,11 @@ on_batch_query(
|
|||
Error
|
||||
end.
|
||||
|
||||
on_format_query_result({ok, StatusCode, BodyMap}) ->
|
||||
#{result => ok, status_code => StatusCode, body => BodyMap};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
on_get_status(_InstanceId, #{server := Server}) ->
|
||||
Result =
|
||||
case opentsdb_connectivity(Server) of
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
on_get_status/2,
|
||||
on_get_channel_status/3,
|
||||
on_query/3,
|
||||
on_query_async/4
|
||||
on_query_async/4,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-type pulsar_client_id() :: atom().
|
||||
|
@ -234,6 +235,11 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
|
|||
}),
|
||||
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
|
||||
|
||||
on_format_query_result({ok, Info}) ->
|
||||
#{result => ok, info => Info};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
%%-------------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%-------------------------------------------------------------------------------------
|
||||
|
|
|
@ -39,7 +39,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% callbacks for ecpool
|
||||
|
@ -320,6 +321,11 @@ on_batch_query(ResourceId, BatchRequests, State) ->
|
|||
),
|
||||
do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
|
||||
|
||||
on_format_query_result({ok, Rows}) ->
|
||||
#{result => ok, rows => Rows};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
|
||||
Health = emqx_resource_pool:health_check_workers(
|
||||
PoolName,
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([connector_examples/1]).
|
||||
|
@ -215,6 +216,11 @@ on_batch_query(InstanceId, BatchReq, State) ->
|
|||
?SLOG(error, LogMeta#{msg => "invalid_request"}),
|
||||
{error, {unrecoverable_error, invalid_request}}.
|
||||
|
||||
on_format_query_result({ok, ResultMap}) ->
|
||||
#{result => ok, info => ResultMap};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
|
||||
case
|
||||
emqx_resource_pool:health_check_workers(
|
||||
|
|
|
@ -30,7 +30,8 @@
|
|||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% ecpool connect & reconnect
|
||||
|
@ -214,6 +215,13 @@ on_batch_query(
|
|||
}),
|
||||
{error, {unrecoverable_error, invalid_request}}.
|
||||
|
||||
on_format_query_result({ok, ColumnNames, Rows}) ->
|
||||
#{result => ok, column_names => ColumnNames, rows => Rows};
|
||||
on_format_query_result({ok, DataList}) ->
|
||||
#{result => ok, column_names_rows_list => DataList};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
mysql_function(sql) ->
|
||||
query;
|
||||
mysql_function(prepared_query) ->
|
||||
|
|
|
@ -68,7 +68,7 @@
|
|||
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
|
||||
).
|
||||
-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)).
|
||||
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
||||
-define(REPLY(FROM, SENT, RESULT, TRACE_CTX), {reply, FROM, SENT, RESULT, TRACE_CTX}).
|
||||
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
|
||||
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
|
||||
).
|
||||
|
@ -448,8 +448,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|||
Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
|
||||
{ShouldAck, PostFn, DeltaCounters} =
|
||||
case QueryOrBatch of
|
||||
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) ->
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
|
||||
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) ->
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx),
|
||||
reply_caller_defer_metrics(Id, Reply, QueryOpts);
|
||||
[?QUERY(_, _, _, _, _) | _] = Batch ->
|
||||
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
|
||||
|
@ -662,10 +662,10 @@ do_flush(
|
|||
inflight_tid := InflightTID
|
||||
} = Data0,
|
||||
%% unwrap when not batching (i.e., batch size == 1)
|
||||
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch,
|
||||
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) = Request] = Batch,
|
||||
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
|
||||
Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx),
|
||||
{ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts),
|
||||
Data1 = aggregate_counters(Data0, DeltaCounters),
|
||||
case ShouldAck of
|
||||
|
@ -856,15 +856,15 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
|
|||
|
||||
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
|
||||
lists:map(
|
||||
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) ->
|
||||
?REPLY(FROM, SENT, Result)
|
||||
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx), Result}) ->
|
||||
?REPLY(FROM, SENT, Result, TraceCtx)
|
||||
end,
|
||||
lists:zip(Batch, BatchResults)
|
||||
);
|
||||
expand_batch_reply(BatchResult, Batch) ->
|
||||
lists:map(
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) ->
|
||||
?REPLY(FROM, SENT, BatchResult)
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx)) ->
|
||||
?REPLY(FROM, SENT, BatchResult, TraceCtx)
|
||||
end,
|
||||
Batch
|
||||
).
|
||||
|
@ -876,12 +876,14 @@ reply_caller(Id, Reply, QueryOpts) ->
|
|||
|
||||
%% Should only reply to the caller when the decision is final (not
|
||||
%% retriable). See comment on `handle_query_result_pure'.
|
||||
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) ->
|
||||
handle_query_result_pure(Id, Result, HasBeenSent);
|
||||
reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) ->
|
||||
reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result, TraceCtx), _QueryOpts) ->
|
||||
handle_query_result_pure(Id, Result, HasBeenSent, TraceCtx);
|
||||
reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx), QueryOpts) ->
|
||||
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
|
||||
IsUnrecoverableError = is_unrecoverable_error(Result),
|
||||
{ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
|
||||
{ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(
|
||||
Id, Result, HasBeenSent, TraceCtx
|
||||
),
|
||||
case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
|
||||
{ack, {async_return, _}, true, _} ->
|
||||
ok = do_reply_caller(ReplyTo, Result);
|
||||
|
@ -921,7 +923,7 @@ batch_reply_dropped(Batch, Result) ->
|
|||
%% This is only called by `simple_{,a}sync_query', so we can bump the
|
||||
%% counters here.
|
||||
handle_query_result(Id, Result, HasBeenSent) ->
|
||||
{ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent),
|
||||
{ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent, #{}),
|
||||
PostFn(),
|
||||
bump_counters(Id, DeltaCounters),
|
||||
ShouldBlock.
|
||||
|
@ -932,37 +934,49 @@ handle_query_result(Id, Result, HasBeenSent) ->
|
|||
%% * the result is a success (or at least a delayed result)
|
||||
%% We also retry even sync requests. In that case, we shouldn't reply
|
||||
%% the caller until one of those final results above happen.
|
||||
-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) ->
|
||||
-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean(), TraceCTX :: map()) ->
|
||||
{ack | nack, function(), counters()}.
|
||||
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
|
||||
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent, TraceCTX) ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}),
|
||||
?TRACE(
|
||||
error,
|
||||
"ERROR",
|
||||
"resource_exception",
|
||||
(trace_ctx_map(TraceCTX))#{info => emqx_utils:redact(Msg)}
|
||||
),
|
||||
ok
|
||||
end,
|
||||
{nack, PostFn, #{}};
|
||||
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
|
||||
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _TraceCTX) when
|
||||
NotWorking == not_connected; NotWorking == blocked
|
||||
->
|
||||
{nack, fun() -> ok end, #{}};
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) ->
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, TraceCTX) ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}),
|
||||
?TRACE(
|
||||
error,
|
||||
"ERROR",
|
||||
"resource_not_found",
|
||||
(trace_ctx_map(TraceCTX))#{id => Id, info => Msg}
|
||||
),
|
||||
ok
|
||||
end,
|
||||
{ack, PostFn, #{dropped_resource_not_found => 1}};
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, TraceCTX) ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}),
|
||||
?TRACE(error, "ERROR", "resource_stopped", (trace_ctx_map(TraceCTX))#{id => Id, info => Msg}),
|
||||
ok
|
||||
end,
|
||||
{ack, PostFn, #{dropped_resource_stopped => 1}};
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
|
||||
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, TraceCTX) ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}),
|
||||
?TRACE(error, "ERROR", "other_resource_error", (trace_ctx_map(TraceCTX))#{
|
||||
id => Id, reason => Reason
|
||||
}),
|
||||
ok
|
||||
end,
|
||||
{nack, PostFn, #{}};
|
||||
handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
||||
handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
|
||||
case is_unrecoverable_error(Error) of
|
||||
true ->
|
||||
PostFn =
|
||||
|
@ -979,14 +993,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
|||
false ->
|
||||
PostFn =
|
||||
fun() ->
|
||||
?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}),
|
||||
?TRACE(error, "ERROR", "send_error", (trace_ctx_map(TraceCTX))#{
|
||||
id => Id, reason => Reason
|
||||
}),
|
||||
ok
|
||||
end,
|
||||
{nack, PostFn, #{}}
|
||||
end;
|
||||
handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
|
||||
handle_query_async_result_pure(Id, Result, HasBeenSent);
|
||||
handle_query_result_pure(_Id, Result, HasBeenSent) ->
|
||||
handle_query_result_pure(Id, {async_return, Result}, HasBeenSent, TraceCTX) ->
|
||||
handle_query_async_result_pure(Id, Result, HasBeenSent, TraceCTX);
|
||||
handle_query_result_pure(_Id, Result, HasBeenSent, _TraceCTX) ->
|
||||
PostFn = fun() ->
|
||||
assert_ok_result(Result),
|
||||
ok
|
||||
|
@ -998,9 +1014,9 @@ handle_query_result_pure(_Id, Result, HasBeenSent) ->
|
|||
end,
|
||||
{ack, PostFn, Counters}.
|
||||
|
||||
-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) ->
|
||||
-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean(), map()) ->
|
||||
{ack | nack, function(), counters()}.
|
||||
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
||||
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
|
||||
case is_unrecoverable_error(Error) of
|
||||
true ->
|
||||
PostFn =
|
||||
|
@ -1016,16 +1032,18 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
|
|||
{ack, PostFn, Counters};
|
||||
false ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}),
|
||||
?TRACE(error, "ERROR", "async_send_error", (trace_ctx_map(TraceCTX))#{
|
||||
id => Id, reason => Reason
|
||||
}),
|
||||
ok
|
||||
end,
|
||||
{nack, PostFn, #{}}
|
||||
end;
|
||||
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
|
||||
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent, _TraceCTX) when is_pid(Pid) ->
|
||||
{ack, fun() -> ok end, #{}};
|
||||
handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
|
||||
handle_query_async_result_pure(_Id, ok, _HasBeenSent, _TraceCTX) ->
|
||||
{ack, fun() -> ok end, #{}};
|
||||
handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) ->
|
||||
handle_query_async_result_pure(Id, Results, HasBeenSent, TraceCTX) when is_list(Results) ->
|
||||
All = fun(L) ->
|
||||
case L of
|
||||
{ok, Pid} -> is_pid(Pid);
|
||||
|
@ -1037,17 +1055,26 @@ handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -
|
|||
{ack, fun() -> ok end, #{}};
|
||||
false ->
|
||||
PostFn = fun() ->
|
||||
?SLOG(error, #{
|
||||
id => Id,
|
||||
msg => "async_batch_send_error",
|
||||
reason => Results,
|
||||
has_been_sent => HasBeenSent
|
||||
}),
|
||||
?TRACE(
|
||||
error,
|
||||
"ERROR",
|
||||
"async_batch_send_error",
|
||||
(trace_ctx_map(TraceCTX))#{
|
||||
id => Id,
|
||||
reason => Results,
|
||||
has_been_sent => HasBeenSent
|
||||
}
|
||||
),
|
||||
ok
|
||||
end,
|
||||
{nack, PostFn, #{}}
|
||||
end.
|
||||
|
||||
trace_ctx_map(undefined) ->
|
||||
#{};
|
||||
trace_ctx_map(Map) ->
|
||||
Map.
|
||||
|
||||
-spec aggregate_counters(data(), counters()) -> data().
|
||||
aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) ->
|
||||
Counters = merge_counters(OldCounters, DeltaCounters),
|
||||
|
@ -1526,7 +1553,7 @@ do_handle_async_reply(
|
|||
request_ref := Ref,
|
||||
buffer_worker := BufferWorkerPid,
|
||||
inflight_tid := InflightTID,
|
||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query
|
||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, TraceCtx) = _Query
|
||||
},
|
||||
Result
|
||||
) ->
|
||||
|
@ -1534,7 +1561,7 @@ do_handle_async_reply(
|
|||
%% but received no ACK, NOT the number of messages queued in the
|
||||
%% inflight window.
|
||||
{Action, PostFn, DeltaCounters} = reply_caller_defer_metrics(
|
||||
Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts
|
||||
Id, ?REPLY(ReplyTo, Sent, Result, TraceCtx), QueryOpts
|
||||
),
|
||||
|
||||
?tp(handle_async_reply, #{
|
||||
|
|
|
@ -757,14 +757,18 @@ do_inc_action_metrics(
|
|||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
|
||||
do_inc_action_metrics(
|
||||
#{rule_id := RuleId, action_id := ActId} = TraceContext,
|
||||
{error, {recoverable_error, _}}
|
||||
{error, {recoverable_error, _}} = Reason
|
||||
) ->
|
||||
FormatterRes = #emqx_trace_format_func_data{
|
||||
function = fun trace_formatted_result/1,
|
||||
data = {ActId, Reason}
|
||||
},
|
||||
TraceContext1 = maps:remove(action_id, TraceContext),
|
||||
trace_action(ActId, "out_of_service", TraceContext1),
|
||||
trace_action(ActId, "out_of_service", TraceContext1#{reason => FormatterRes}),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
||||
do_inc_action_metrics(
|
||||
#{rule_id := RuleId, action_id := ActId} = TraceContext,
|
||||
{error, {unrecoverable_error, _} = Reason}
|
||||
{error, {unrecoverable_error, _}} = Reason
|
||||
) ->
|
||||
TraceContext1 = maps:remove(action_id, TraceContext),
|
||||
FormatterRes = #emqx_trace_format_func_data{
|
||||
|
@ -801,12 +805,12 @@ do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R
|
|||
trace_formatted_result({{bridge_v2, Type, _Name}, R}) ->
|
||||
ConnectorType = emqx_action_info:action_type_to_connector_type(Type),
|
||||
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
|
||||
emqx_resource:call_format_query_result(ResourceModule, R);
|
||||
clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R));
|
||||
trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) ->
|
||||
BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
|
||||
ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type),
|
||||
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
|
||||
emqx_resource:call_format_query_result(ResourceModule, R);
|
||||
clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R));
|
||||
trace_formatted_result({_, R}) ->
|
||||
R.
|
||||
|
||||
|
@ -819,6 +823,15 @@ is_ok_result(R) when is_tuple(R) ->
|
|||
is_ok_result(_) ->
|
||||
false.
|
||||
|
||||
clean_up_error_tuple({error, {unrecoverable_error, Reason}}) ->
|
||||
Reason;
|
||||
clean_up_error_tuple({error, {recoverable_error, Reason}}) ->
|
||||
Reason;
|
||||
clean_up_error_tuple({error, Reason}) ->
|
||||
Reason;
|
||||
clean_up_error_tuple(Result) ->
|
||||
Result.
|
||||
|
||||
parse_module_name(Name) when is_binary(Name) ->
|
||||
case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of
|
||||
true ->
|
||||
|
|
|
@ -87,6 +87,7 @@ end_per_testcase(_TestCase, _Config) ->
|
|||
emqx_bridge_v2_testlib:delete_all_bridges(),
|
||||
emqx_bridge_v2_testlib:delete_all_connectors(),
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
meck:unload(),
|
||||
ok.
|
||||
|
||||
t_basic_apply_rule_trace_ruleid(Config) ->
|
||||
|
@ -229,7 +230,6 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
|
|||
)
|
||||
|| #{<<"meta">> := Meta} <- LogEntries
|
||||
],
|
||||
emqx_trace:delete(TraceName),
|
||||
ok.
|
||||
|
||||
do_final_log_check(Action, Bin0) when is_binary(Action) ->
|
||||
|
@ -289,26 +289,14 @@ create_trace(TraceName, TraceType, TraceValue) ->
|
|||
end_at => End,
|
||||
formatter => json
|
||||
},
|
||||
{ok, _} = emqx_trace:create(Trace).
|
||||
{ok, _} = CreateRes = emqx_trace:create(Trace),
|
||||
emqx_common_test_helpers:on_exit(fun() ->
|
||||
ok = emqx_trace:delete(TraceName)
|
||||
end),
|
||||
CreateRes.
|
||||
|
||||
t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
|
||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||
catch meck:new(emqx_connector_info, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_connector_info,
|
||||
hard_coded_test_connector_info_modules,
|
||||
0,
|
||||
[emqx_rule_engine_test_connector_info]
|
||||
),
|
||||
emqx_connector_info:clean_cache(),
|
||||
catch meck:new(emqx_action_info, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_action_info,
|
||||
hard_coded_test_action_info_modules,
|
||||
0,
|
||||
[emqx_rule_engine_test_action_info]
|
||||
),
|
||||
emqx_action_info:clean_cache(),
|
||||
meck_in_test_connector(),
|
||||
{ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
ActionConf =
|
||||
|
@ -405,14 +393,242 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
|
|||
)
|
||||
end
|
||||
),
|
||||
%% Cleanup
|
||||
ok = emqx_trace:delete(Name),
|
||||
ok = emqx_rule_engine:delete_rule(RuleID),
|
||||
ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME),
|
||||
ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME),
|
||||
[_, _] = meck:unload(),
|
||||
ok.
|
||||
|
||||
t_apply_rule_test_format_action_failed(_Config) ->
|
||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||
catch meck:new(emqx_rule_engine_test_connector, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
on_query,
|
||||
3,
|
||||
{error, {unrecoverable_error, <<"MY REASON">>}}
|
||||
),
|
||||
CheckFun =
|
||||
fun(Bin0) ->
|
||||
%% The last line in the Bin should be the action_failed entry
|
||||
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
|
||||
Bin1 = string:trim(Bin0),
|
||||
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
|
||||
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"level">> := <<"debug">>,
|
||||
<<"meta">> := #{
|
||||
<<"action_info">> := #{
|
||||
<<"name">> := _,
|
||||
<<"type">> := <<"rule_engine_test">>
|
||||
},
|
||||
<<"client_ids">> := [],
|
||||
<<"clientid">> := _,
|
||||
<<"reason">> := <<"MY REASON">>,
|
||||
<<"rule_id">> := _,
|
||||
<<"rule_ids">> := [],
|
||||
<<"rule_trigger_time">> := _,
|
||||
<<"rule_trigger_times">> := [],
|
||||
<<"stop_action_after_render">> := false,
|
||||
<<"trace_tag">> := <<"ACTION">>
|
||||
},
|
||||
<<"msg">> := <<"action_failed">>,
|
||||
<<"time">> := _
|
||||
},
|
||||
LastEntryJSON
|
||||
)
|
||||
end,
|
||||
do_apply_rule_test_format_action_failed_test(1, CheckFun).
|
||||
|
||||
t_apply_rule_test_format_action_out_of_service_query(_Config) ->
|
||||
Reason = <<"MY_RECOVERABLE_REASON">>,
|
||||
CheckFun = out_of_service_check_fun(<<"send_error">>, Reason),
|
||||
meck_test_connector_recoverable_errors(Reason),
|
||||
do_apply_rule_test_format_action_failed_test(1, CheckFun).
|
||||
|
||||
t_apply_rule_test_format_action_out_of_service_batch_query(_Config) ->
|
||||
Reason = <<"MY_RECOVERABLE_REASON">>,
|
||||
CheckFun = out_of_service_check_fun(<<"send_error">>, Reason),
|
||||
meck_test_connector_recoverable_errors(Reason),
|
||||
do_apply_rule_test_format_action_failed_test(10, CheckFun).
|
||||
|
||||
t_apply_rule_test_format_action_out_of_service_async_query(_Config) ->
|
||||
Reason = <<"MY_RECOVERABLE_REASON">>,
|
||||
CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason),
|
||||
meck_test_connector_recoverable_errors(Reason),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
callback_mode,
|
||||
0,
|
||||
async_if_possible
|
||||
),
|
||||
do_apply_rule_test_format_action_failed_test(1, CheckFun).
|
||||
|
||||
t_apply_rule_test_format_action_out_of_service_async_batch_query(_Config) ->
|
||||
Reason = <<"MY_RECOVERABLE_REASON">>,
|
||||
CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason),
|
||||
meck_test_connector_recoverable_errors(Reason),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
callback_mode,
|
||||
0,
|
||||
async_if_possible
|
||||
),
|
||||
do_apply_rule_test_format_action_failed_test(10, CheckFun).
|
||||
|
||||
out_of_service_check_fun(SendErrorMsg, Reason) ->
|
||||
fun(Bin0) ->
|
||||
%% The last line in the Bin should be the action_failed entry
|
||||
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
|
||||
io:format("LOG:\n~s", [Bin0]),
|
||||
Bin1 = string:trim(Bin0),
|
||||
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
|
||||
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"level">> := <<"debug">>,
|
||||
<<"meta">> :=
|
||||
#{
|
||||
<<"action_info">> :=
|
||||
#{
|
||||
<<"name">> := _,
|
||||
<<"type">> := <<"rule_engine_test">>
|
||||
},
|
||||
<<"clientid">> := _,
|
||||
<<"reason">> := <<"request_expired">>,
|
||||
<<"rule_id">> := _,
|
||||
<<"rule_trigger_time">> := _,
|
||||
<<"stop_action_after_render">> := false,
|
||||
<<"trace_tag">> := <<"ACTION">>
|
||||
},
|
||||
<<"msg">> := <<"action_failed">>,
|
||||
<<"time">> := _
|
||||
},
|
||||
LastEntryJSON
|
||||
),
|
||||
%% We should have at least one entry containing Reason
|
||||
[ReasonLine | _] = find_lines_with(Bin1, Reason),
|
||||
ReasonEntryJSON = emqx_utils_json:decode(ReasonLine, [return_maps]),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"level">> := <<"debug">>,
|
||||
<<"meta">> :=
|
||||
#{
|
||||
<<"client_ids">> := [],
|
||||
<<"clientid">> := _,
|
||||
<<"id">> := _,
|
||||
<<"reason">> :=
|
||||
#{
|
||||
<<"additional_info">> := _,
|
||||
<<"error_type">> := <<"recoverable_error">>,
|
||||
<<"msg">> := <<"MY_RECOVERABLE_REASON">>
|
||||
},
|
||||
<<"rule_id">> := _,
|
||||
<<"rule_ids">> := [],
|
||||
<<"rule_trigger_time">> := _,
|
||||
<<"rule_trigger_times">> := [],
|
||||
<<"stop_action_after_render">> := false,
|
||||
<<"trace_tag">> := <<"ERROR">>
|
||||
},
|
||||
<<"msg">> := SendErrorMsg,
|
||||
<<"time">> := _
|
||||
},
|
||||
ReasonEntryJSON
|
||||
)
|
||||
end.
|
||||
|
||||
meck_test_connector_recoverable_errors(Reason) ->
|
||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||
catch meck:new(emqx_rule_engine_test_connector, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
on_query,
|
||||
3,
|
||||
{error, {recoverable_error, Reason}}
|
||||
),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
on_batch_query,
|
||||
3,
|
||||
{error, {recoverable_error, Reason}}
|
||||
),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
on_query_async,
|
||||
4,
|
||||
{error, {recoverable_error, Reason}}
|
||||
),
|
||||
meck:expect(
|
||||
emqx_rule_engine_test_connector,
|
||||
on_batch_query_async,
|
||||
4,
|
||||
{error, {recoverable_error, Reason}}
|
||||
).
|
||||
|
||||
find_lines_with(Data, InLineText) ->
|
||||
% Split the binary data into lines
|
||||
Lines = re:split(Data, "\n", [{return, binary}]),
|
||||
|
||||
% Use a list comprehension to filter lines containing 'Reason'
|
||||
[Line || Line <- Lines, re:run(Line, InLineText, [multiline, {capture, none}]) =/= nomatch].
|
||||
|
||||
do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) ->
|
||||
meck_in_test_connector(),
|
||||
{ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
|
||||
Name = atom_to_binary(?FUNCTION_NAME),
|
||||
ActionConf =
|
||||
#{
|
||||
<<"connector">> => Name,
|
||||
<<"parameters">> => #{<<"values">> => #{}},
|
||||
<<"resource_opts">> => #{
|
||||
<<"batch_size">> => BatchSize,
|
||||
<<"batch_time">> => 10,
|
||||
<<"request_ttl">> => 200
|
||||
}
|
||||
},
|
||||
{ok, _} = emqx_bridge_v2:create(
|
||||
rule_engine_test,
|
||||
?FUNCTION_NAME,
|
||||
ActionConf
|
||||
),
|
||||
SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>,
|
||||
{ok, RuleID} = create_rule_with_action(
|
||||
rule_engine_test,
|
||||
?FUNCTION_NAME,
|
||||
SQL
|
||||
),
|
||||
create_trace(Name, ruleid, RuleID),
|
||||
Now = erlang:system_time(second) - 10,
|
||||
%% Stop
|
||||
ParmsNoStopAfterRender = apply_rule_parms(false, Name),
|
||||
{ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender),
|
||||
%% Just check that the log file is created as expected
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 100,
|
||||
begin
|
||||
Bin = read_rule_trace_file(Name, ruleid, Now),
|
||||
CheckLastTraceEntryFun(Bin)
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
meck_in_test_connector() ->
|
||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||
catch meck:new(emqx_connector_info, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_connector_info,
|
||||
hard_coded_test_connector_info_modules,
|
||||
0,
|
||||
[emqx_rule_engine_test_connector_info]
|
||||
),
|
||||
emqx_connector_info:clean_cache(),
|
||||
catch meck:new(emqx_action_info, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_action_info,
|
||||
hard_coded_test_action_info_modules,
|
||||
0,
|
||||
[emqx_rule_engine_test_action_info]
|
||||
),
|
||||
emqx_action_info:clean_cache().
|
||||
|
||||
apply_rule_parms(StopAfterRender, Name) ->
|
||||
Payload = #{<<"is_stop_after_render">> => StopAfterRender},
|
||||
Context = #{
|
||||
|
@ -441,6 +657,9 @@ create_rule_with_action(ActionType, ActionName, SQL) ->
|
|||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
||||
{ok, Res0} ->
|
||||
#{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
|
||||
emqx_common_test_helpers:on_exit(fun() ->
|
||||
emqx_rule_engine:delete_rule(RuleId)
|
||||
end),
|
||||
{ok, RuleId};
|
||||
Error ->
|
||||
Error
|
||||
|
|
|
@ -29,7 +29,9 @@
|
|||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_query_async/4,
|
||||
on_batch_query/3,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
|
@ -85,6 +87,14 @@ on_query(
|
|||
) ->
|
||||
ok.
|
||||
|
||||
on_query_async(
|
||||
_InstId,
|
||||
_Query,
|
||||
_State,
|
||||
_Callback
|
||||
) ->
|
||||
ok.
|
||||
|
||||
on_batch_query(
|
||||
_InstId,
|
||||
[{ChannelId, _Req} | _] = Msg,
|
||||
|
@ -96,5 +106,13 @@ on_batch_query(
|
|||
emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}),
|
||||
ok.
|
||||
|
||||
on_batch_query_async(
|
||||
_InstId,
|
||||
_Batch,
|
||||
_State,
|
||||
_Callback
|
||||
) ->
|
||||
ok.
|
||||
|
||||
on_get_status(_InstId, _State) ->
|
||||
connected.
|
||||
|
|
Loading…
Reference in New Issue