fix: issues found during PR review (thanks @thalesmg and @zmstone)
* Simpler handling of true and false in best effort JSON formatter * inet:ntoa/1 to format IP addresses * Made a record for lazy formatted trace values and formatter to improve maintainability * Added callback to format return value from connector * Extended test case to check that the format return value callback works * Added handling of "lazy" trace entry data to the text formatter. Do we need to handle this data in the normal log formatters as well?
This commit is contained in:
parent
e32745bca6
commit
5bfe31b691
|
@ -35,6 +35,11 @@
|
|||
end_at :: integer() | undefined | '_'
|
||||
}).
|
||||
|
||||
-record(emqx_trace_format_func_data, {
|
||||
function :: fun((any()) -> any()),
|
||||
data :: any()
|
||||
}).
|
||||
|
||||
-define(SHARD, ?COMMON_SHARD).
|
||||
-define(MAX_SIZE, 30).
|
||||
|
||||
|
|
|
@ -229,14 +229,7 @@ best_effort_json_obj(Map, Config) ->
|
|||
do_format_msg("~p", [Map], Config)
|
||||
end.
|
||||
|
||||
json_value(true, _Config) ->
|
||||
true;
|
||||
json_value(false, _Config) ->
|
||||
false;
|
||||
json_value(V, Config) ->
|
||||
json(V, Config).
|
||||
|
||||
json(A, _) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
json(A, _) when is_atom(A) -> A;
|
||||
json(I, _) when is_integer(I) -> I;
|
||||
json(F, _) when is_float(F) -> F;
|
||||
json(P, C) when is_pid(P) -> json(pid_to_list(P), C);
|
||||
|
@ -324,7 +317,7 @@ json_kv(K0, V, Config) ->
|
|||
K = json_key(K0),
|
||||
case is_map(V) of
|
||||
true -> {K, best_effort_json_obj(V, Config)};
|
||||
false -> {K, json_value(V, Config)}
|
||||
false -> {K, json(V, Config)}
|
||||
end.
|
||||
|
||||
json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8));
|
||||
|
|
|
@ -15,9 +15,11 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_trace_formatter).
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx_trace.hrl").
|
||||
|
||||
-export([format/2]).
|
||||
-export([format_meta_map/1]).
|
||||
-export([evaluate_lazy_values/1]).
|
||||
|
||||
%% logger_formatter:config/0 is not exported.
|
||||
-type config() :: map().
|
||||
|
@ -28,18 +30,35 @@
|
|||
LogEvent :: logger:log_event(),
|
||||
Config :: config().
|
||||
format(
|
||||
#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
|
||||
#{level := debug, meta := Meta0 = #{trace_tag := Tag}, msg := Msg},
|
||||
#{payload_encode := PEncode}
|
||||
) ->
|
||||
Meta1 = evaluate_lazy_values(Meta0),
|
||||
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
|
||||
ClientId = to_iolist(maps:get(clientid, Meta, "")),
|
||||
Peername = maps:get(peername, Meta, ""),
|
||||
MetaBin = format_meta(Meta, PEncode),
|
||||
ClientId = to_iolist(maps:get(clientid, Meta1, "")),
|
||||
Peername = maps:get(peername, Meta1, ""),
|
||||
MetaBin = format_meta(Meta1, PEncode),
|
||||
Msg1 = to_iolist(Msg),
|
||||
Tag1 = to_iolist(Tag),
|
||||
[Time, " [", Tag1, "] ", ClientId, "@", Peername, " msg: ", Msg1, ", ", MetaBin, "\n"];
|
||||
format(Event, Config) ->
|
||||
emqx_logger_textfmt:format(Event, Config).
|
||||
emqx_logger_textfmt:format(evaluate_lazy_values(Event), Config).
|
||||
|
||||
evaluate_lazy_values(Map) when is_map(Map) ->
|
||||
maps:map(fun evaluate_lazy_values_kv/2, Map);
|
||||
evaluate_lazy_values(V) ->
|
||||
V.
|
||||
|
||||
evaluate_lazy_values_kv(_K, #emqx_trace_format_func_data{function = Formatter, data = V}) ->
|
||||
try
|
||||
NewV = Formatter(V),
|
||||
evaluate_lazy_values(NewV)
|
||||
catch
|
||||
_:_ ->
|
||||
V
|
||||
end;
|
||||
evaluate_lazy_values_kv(_K, V) ->
|
||||
evaluate_lazy_values(V).
|
||||
|
||||
format_meta_map(Meta) ->
|
||||
Encode = emqx_trace_handler:payload_encode(),
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
-module(emqx_trace_json_formatter).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx_trace.hrl").
|
||||
|
||||
-export([format/2]).
|
||||
|
||||
|
@ -30,15 +31,16 @@
|
|||
LogEvent :: logger:log_event(),
|
||||
Config :: config().
|
||||
format(
|
||||
LogMap,
|
||||
LogMap0,
|
||||
#{payload_encode := PEncode}
|
||||
) ->
|
||||
LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0),
|
||||
%% We just make some basic transformations on the input LogMap and then do
|
||||
%% an external call to create the JSON text
|
||||
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
|
||||
LogMap1 = LogMap#{time => Time},
|
||||
LogMap2 = prepare_log_map(LogMap1, PEncode),
|
||||
[emqx_logger_jsonfmt:best_effort_json(LogMap2, [force_utf8]), "\n"].
|
||||
LogMap2 = LogMap1#{time => Time},
|
||||
LogMap3 = prepare_log_map(LogMap2, PEncode),
|
||||
[emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"].
|
||||
|
||||
%%%-----------------------------------------------------------------
|
||||
%%% Helper Functions
|
||||
|
@ -48,55 +50,26 @@ prepare_log_map(LogMap, PEncode) ->
|
|||
NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
|
||||
maps:from_list(NewKeyValuePairs).
|
||||
|
||||
prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) ->
|
||||
%% A cusom formatter is provided with the value
|
||||
try
|
||||
NewV = Formatter(V),
|
||||
prepare_key_value(K, NewV, PEncode)
|
||||
catch
|
||||
_:_ ->
|
||||
{K, V}
|
||||
end;
|
||||
prepare_key_value(K, {ok, {Formatter, V}}, PEncode) when is_function(Formatter, 1) ->
|
||||
%% Unwrap
|
||||
prepare_key_value(K, {Formatter, V}, PEncode);
|
||||
prepare_key_value(host, {I1, I2, I3, I4}, _PEncode) when
|
||||
prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when
|
||||
is_integer(I1),
|
||||
is_integer(I2),
|
||||
is_integer(I3),
|
||||
is_integer(I4)
|
||||
->
|
||||
%% We assume this is an IP address
|
||||
{host,
|
||||
unicode:characters_to_binary([
|
||||
integer_to_binary(I1),
|
||||
<<".">>,
|
||||
integer_to_binary(I2),
|
||||
<<".">>,
|
||||
integer_to_binary(I3),
|
||||
<<".">>,
|
||||
integer_to_binary(I4)
|
||||
])};
|
||||
prepare_key_value(K, {ok, StatusCode, Headers}, PEncode) when
|
||||
is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers)
|
||||
{host, unicode:characters_to_binary(inet:ntoa(IP))};
|
||||
prepare_key_value(host, {I1, I2, I3, I4, I5, I6, I7, I8} = IP, _PEncode) when
|
||||
is_integer(I1),
|
||||
is_integer(I2),
|
||||
is_integer(I3),
|
||||
is_integer(I4),
|
||||
is_integer(I5),
|
||||
is_integer(I6),
|
||||
is_integer(I7),
|
||||
is_integer(I8)
|
||||
->
|
||||
prepare_key_value(K, {ok, StatusCode, Headers, <<"">>}, PEncode);
|
||||
prepare_key_value(K, {ok, StatusCode, Headers, Body}, PEncode) when
|
||||
is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers)
|
||||
->
|
||||
%% We assume this is that response of an HTTP request
|
||||
prepare_key_value(
|
||||
K,
|
||||
#{
|
||||
result => ok,
|
||||
response => #{
|
||||
status => StatusCode,
|
||||
headers => Headers,
|
||||
body => Body
|
||||
}
|
||||
},
|
||||
PEncode
|
||||
);
|
||||
%% We assume this is an IP address
|
||||
{host, unicode:characters_to_binary(inet:ntoa(IP))};
|
||||
prepare_key_value(payload = K, V, PEncode) ->
|
||||
NewV =
|
||||
try
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
-export([execute/2]).
|
||||
-endif.
|
||||
|
||||
-include_lib("emqx/include/emqx_trace.hrl").
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
@ -107,7 +109,10 @@ do_query(Table, Query0, Templates, TraceRenderedCTX) ->
|
|||
Query = apply_template(Query0, Templates),
|
||||
emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
|
||||
table => Table,
|
||||
query => {fun trace_format_query/1, Query}
|
||||
query => #emqx_trace_format_func_data{
|
||||
function = fun trace_format_query/1,
|
||||
data = Query
|
||||
}
|
||||
}),
|
||||
execute(Query, Table)
|
||||
catch
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_trace.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
|
@ -35,7 +36,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([reply_delegator/3]).
|
||||
|
@ -414,24 +416,6 @@ on_query(
|
|||
Result
|
||||
end.
|
||||
|
||||
maybe_trace_format_result(Res) ->
|
||||
%% If rule tracing is active, then we know that the connector is used by an
|
||||
%% action and that the result is used for tracing only. This is why we can
|
||||
%% add a function to lazily format the trace entry.
|
||||
case emqx_trace:is_rule_trace_active() of
|
||||
true ->
|
||||
{ok, {fun trace_format_result/1, Res}};
|
||||
false ->
|
||||
Res
|
||||
end.
|
||||
|
||||
trace_format_result({ok, Status, Headers, Body}) ->
|
||||
#{status => Status, headers => Headers, body => Body};
|
||||
trace_format_result({ok, Status, Headers}) ->
|
||||
#{status => Status, headers => Headers};
|
||||
trace_format_result(Result) ->
|
||||
Result.
|
||||
|
||||
%% BridgeV1 entrypoint
|
||||
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
|
||||
case maps:get(request, State, undefined) of
|
||||
|
@ -533,9 +517,15 @@ trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, T
|
|||
port => Port,
|
||||
path => Path,
|
||||
method => Method,
|
||||
headers => {fun emqx_utils_redact:redact_headers/1, Headers},
|
||||
headers => #emqx_trace_format_func_data{
|
||||
function = fun emqx_utils_redact:redact_headers/1,
|
||||
data = Headers
|
||||
},
|
||||
timeout => Timeout,
|
||||
url => {fun render_url/1, {Scheme, Host, Port, Path}}
|
||||
url => #emqx_trace_format_func_data{
|
||||
function = fun render_url/1,
|
||||
data = {Scheme, Host, Port, Path}
|
||||
}
|
||||
}
|
||||
);
|
||||
{Path, Headers, Body} ->
|
||||
|
@ -546,10 +536,19 @@ trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, T
|
|||
port => Port,
|
||||
path => Path,
|
||||
method => Method,
|
||||
headers => {fun emqx_utils_redact:redact_headers/1, Headers},
|
||||
headers => #emqx_trace_format_func_data{
|
||||
function = fun emqx_utils_redact:redact_headers/1,
|
||||
data = Headers
|
||||
},
|
||||
timeout => Timeout,
|
||||
body => {fun log_format_body/1, Body},
|
||||
url => {fun render_url/1, {Scheme, Host, Port, Path}}
|
||||
body => #emqx_trace_format_func_data{
|
||||
function = fun log_format_body/1,
|
||||
data = Body
|
||||
},
|
||||
url => #emqx_trace_format_func_data{
|
||||
function = fun render_url/1,
|
||||
data = {Scheme, Host, Port, Path}
|
||||
}
|
||||
}
|
||||
)
|
||||
end.
|
||||
|
@ -645,6 +644,26 @@ on_get_channel_status(
|
|||
%% XXX: Reuse the connector status
|
||||
on_get_status(InstId, State).
|
||||
|
||||
on_format_query_result({ok, Status, Headers, Body}) ->
|
||||
#{
|
||||
result => ok,
|
||||
response => #{
|
||||
status => Status,
|
||||
headers => Headers,
|
||||
body => Body
|
||||
}
|
||||
};
|
||||
on_format_query_result({ok, Status, Headers}) ->
|
||||
#{
|
||||
result => ok,
|
||||
response => #{
|
||||
status => Status,
|
||||
headers => Headers
|
||||
}
|
||||
};
|
||||
on_format_query_result(Result) ->
|
||||
Result.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -877,9 +896,9 @@ transform_result(Result) ->
|
|||
{error, _Reason} ->
|
||||
Result;
|
||||
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
||||
maybe_trace_format_result(Result);
|
||||
Result;
|
||||
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
||||
maybe_trace_format_result(Result);
|
||||
Result;
|
||||
{ok, _TooManyRequests = StatusCode = 429, Headers} ->
|
||||
{error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}};
|
||||
{ok, _ServiceUnavailable = StatusCode = 503, Headers} ->
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
@ -125,27 +126,11 @@ on_query(
|
|||
redis_bridge_connector_send_done,
|
||||
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
|
||||
),
|
||||
maybe_trace_format_result(Result);
|
||||
Result;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
maybe_trace_format_result(Res) ->
|
||||
%% If rule tracing is active, then we know that the connector is used by an
|
||||
%% action and that the result is used for tracing only. This is why we can
|
||||
%% add a function to lazily format the trace entry.
|
||||
case emqx_trace:is_rule_trace_active() of
|
||||
true ->
|
||||
{ok, {fun trace_format_result/1, Res}};
|
||||
false ->
|
||||
Res
|
||||
end.
|
||||
|
||||
trace_format_result({ok, Msg}) ->
|
||||
#{result => ok, message => Msg};
|
||||
trace_format_result(Res) ->
|
||||
Res.
|
||||
|
||||
on_batch_query(
|
||||
InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
|
||||
) ->
|
||||
|
@ -172,11 +157,16 @@ on_batch_query(
|
|||
result => Result
|
||||
}
|
||||
),
|
||||
maybe_trace_format_result(Result);
|
||||
Result;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
on_format_query_result({ok, Msg}) ->
|
||||
#{result => ok, message => Msg};
|
||||
on_format_query_result(Res) ->
|
||||
Res.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% private helpers
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
-include_lib("emqx/include/emqx_trace.hrl").
|
||||
-include("emqx_bridge_s3.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
@ -320,7 +321,10 @@ run_simple_upload(
|
|||
emqx_trace:rendered_action_template(ChannelID, #{
|
||||
bucket => Bucket,
|
||||
key => Key,
|
||||
content => {fun unicode:characters_to_binary/1, Content}
|
||||
content => #emqx_trace_format_func_data{
|
||||
function = fun unicode:characters_to_binary/1,
|
||||
data = Content
|
||||
}
|
||||
}),
|
||||
case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
|
||||
ok ->
|
||||
|
|
|
@ -38,7 +38,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
on_get_channel_status/3,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
-export([connect/1]).
|
||||
|
@ -693,11 +694,11 @@ handle_result({error, Error}) ->
|
|||
TranslatedError = translate_to_log_context(Error),
|
||||
{error, {unrecoverable_error, export_error(TranslatedError)}};
|
||||
handle_result(Res) ->
|
||||
maybe_trace_format_result(Res).
|
||||
Res.
|
||||
|
||||
trace_format_result({ok, Cnt}) when is_integer(Cnt) ->
|
||||
on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
|
||||
#{result => ok, affected_rows => Cnt};
|
||||
trace_format_result(Res) ->
|
||||
on_format_query_result(Res) ->
|
||||
Res.
|
||||
|
||||
handle_batch_result([{ok, Count} | Rest], Acc) ->
|
||||
|
@ -706,18 +707,7 @@ handle_batch_result([{error, Error} | _Rest], _Acc) ->
|
|||
TranslatedError = translate_to_log_context(Error),
|
||||
{error, {unrecoverable_error, export_error(TranslatedError)}};
|
||||
handle_batch_result([], Acc) ->
|
||||
maybe_trace_format_result({ok, Acc}).
|
||||
|
||||
maybe_trace_format_result(Res) ->
|
||||
%% If rule tracing is active, then we know that the connector is used by an
|
||||
%% action and that the result is used for tracing only. This is why we can
|
||||
%% add a function to lazily format the trace entry.
|
||||
case emqx_trace:is_rule_trace_active() of
|
||||
true ->
|
||||
{ok, {fun trace_format_result/1, Res}};
|
||||
false ->
|
||||
Res
|
||||
end.
|
||||
{ok, Acc}.
|
||||
|
||||
translate_to_log_context({error, Reason}) ->
|
||||
translate_to_log_context(Reason);
|
||||
|
|
|
@ -86,7 +86,9 @@
|
|||
forget_allocated_resources/1,
|
||||
deallocate_resource/2,
|
||||
%% Get channel config from resource
|
||||
call_get_channel_config/3
|
||||
call_get_channel_config/3,
|
||||
% Call the format query result function
|
||||
call_format_query_result/2
|
||||
]).
|
||||
|
||||
%% Direct calls to the callback module
|
||||
|
@ -154,7 +156,8 @@
|
|||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
query_mode/1
|
||||
query_mode/1,
|
||||
on_format_query_result/1
|
||||
]).
|
||||
|
||||
%% when calling emqx_resource:start/1
|
||||
|
@ -230,6 +233,14 @@
|
|||
ResId :: term()
|
||||
) -> [term()].
|
||||
|
||||
%% When given the result of a on_*query call this function should return a
|
||||
%% version of the result that is suitable for JSON trace logging. This
|
||||
%% typically means converting Erlang tuples to maps with appropriate names for
|
||||
%% the values in the tuple.
|
||||
-callback on_format_query_result(
|
||||
QueryResult :: term()
|
||||
) -> term().
|
||||
|
||||
-define(SAFE_CALL(EXPR),
|
||||
(fun() ->
|
||||
try
|
||||
|
@ -551,6 +562,14 @@ call_get_channel_config(ResId, ChannelId, Mod) ->
|
|||
<<"on_get_channels callback function not available for resource id", ResId/binary>>}
|
||||
end.
|
||||
|
||||
call_format_query_result(Mod, Result) ->
|
||||
case erlang:function_exported(Mod, on_format_query_result, 1) of
|
||||
true ->
|
||||
Mod:on_format_query_result(Result);
|
||||
false ->
|
||||
Result
|
||||
end.
|
||||
|
||||
-spec call_stop(resource_id(), module(), resource_state()) -> term().
|
||||
call_stop(ResId, Mod, ResourceState) ->
|
||||
?SAFE_CALL(begin
|
||||
|
|
|
@ -767,21 +767,49 @@ do_inc_action_metrics(
|
|||
{error, {unrecoverable_error, _} = Reason}
|
||||
) ->
|
||||
TraceContext1 = maps:remove(action_id, TraceContext),
|
||||
trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext1)),
|
||||
FormatterRes = #emqx_trace_format_func_data{
|
||||
function = fun trace_formatted_result/1,
|
||||
data = {ActId, Reason}
|
||||
},
|
||||
trace_action(ActId, "action_failed", maps:merge(#{reason => FormatterRes}, TraceContext1)),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
|
||||
do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) ->
|
||||
TraceContext1 = maps:remove(action_id, TraceContext),
|
||||
FormatterRes = #emqx_trace_format_func_data{
|
||||
function = fun trace_formatted_result/1,
|
||||
data = {ActId, R}
|
||||
},
|
||||
case is_ok_result(R) of
|
||||
false ->
|
||||
trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext1)),
|
||||
trace_action(
|
||||
ActId,
|
||||
"action_failed",
|
||||
maps:merge(#{reason => FormatterRes}, TraceContext1)
|
||||
),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
|
||||
true ->
|
||||
trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext1)),
|
||||
trace_action(
|
||||
ActId,
|
||||
"action_success",
|
||||
maps:merge(#{result => FormatterRes}, TraceContext1)
|
||||
),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
|
||||
end.
|
||||
|
||||
trace_formatted_result({{bridge_v2, Type, _Name}, R}) ->
|
||||
ConnectorType = emqx_action_info:action_type_to_connector_type(Type),
|
||||
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
|
||||
emqx_resource:call_format_query_result(ResourceModule, R);
|
||||
trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) ->
|
||||
BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
|
||||
ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type),
|
||||
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
|
||||
emqx_resource:call_format_query_result(ResourceModule, R);
|
||||
trace_formatted_result({_, R}) ->
|
||||
R.
|
||||
|
||||
is_ok_result(ok) ->
|
||||
true;
|
||||
is_ok_result({async_return, R}) ->
|
||||
|
|
|
@ -210,7 +210,8 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
|
|||
begin
|
||||
Bin = read_rule_trace_file(TraceName, TraceType, Now),
|
||||
io:format("THELOG3:~n~s", [Bin]),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>]))
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])),
|
||||
do_final_log_check(Action, Bin)
|
||||
end
|
||||
)
|
||||
end,
|
||||
|
@ -231,6 +232,51 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
|
|||
emqx_trace:delete(TraceName),
|
||||
ok.
|
||||
|
||||
do_final_log_check(Action, Bin0) when is_binary(Action) ->
|
||||
%% The last line in the Bin should be the action_success entry
|
||||
Bin1 = string:trim(Bin0),
|
||||
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
|
||||
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
|
||||
%% Check that lazy formatting of the action result works correctly
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"level">> := <<"debug">>,
|
||||
<<"meta">> :=
|
||||
#{
|
||||
<<"action_info">> :=
|
||||
#{
|
||||
<<"name">> := <<"emqx_bridge_http_test_lib">>,
|
||||
<<"type">> := <<"http">>
|
||||
},
|
||||
<<"clientid">> := <<"c_emqx">>,
|
||||
<<"result">> :=
|
||||
#{
|
||||
<<"response">> :=
|
||||
#{
|
||||
<<"body">> := <<"hello">>,
|
||||
<<"headers">> :=
|
||||
#{
|
||||
<<"content-type">> := <<"text/plain">>,
|
||||
<<"date">> := _,
|
||||
<<"server">> := _
|
||||
},
|
||||
<<"status">> := 200
|
||||
},
|
||||
<<"result">> := <<"ok">>
|
||||
},
|
||||
<<"rule_id">> := _,
|
||||
<<"rule_trigger_time">> := _,
|
||||
<<"stop_action_after_render">> := false,
|
||||
<<"trace_tag">> := <<"ACTION">>
|
||||
},
|
||||
<<"msg">> := <<"action_success">>,
|
||||
<<"time">> := _
|
||||
},
|
||||
LastEntryJSON
|
||||
);
|
||||
do_final_log_check(_, _) ->
|
||||
ok.
|
||||
|
||||
create_trace(TraceName, TraceType, TraceValue) ->
|
||||
Now = erlang:system_time(second) - 10,
|
||||
Start = Now,
|
||||
|
|
Loading…
Reference in New Issue