From 5ca90ccced49556ff9b842cb45f7aeab8b0656b2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 09:43:11 +0200 Subject: [PATCH 01/27] fix: improve structure of log trace entries for HTTP action Fixes: https://emqx.atlassian.net/browse/EMQX-12025 --- .../emqx_trace/emqx_trace_json_formatter.erl | 15 +++++++++++++++ .../src/emqx_bridge_http_connector.erl | 17 +++++++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 35b09b9b0..8f748ed9f 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -48,6 +48,21 @@ prepare_log_map(LogMap, PEncode) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], maps:from_list(NewKeyValuePairs). +prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) -> + %% A cusom formatter is provided with the value + try + NewV = Formatter(V), + prepare_key_value(K, NewV, PEncode) + catch + _:_ -> + {K, V} + end; +prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when + is_integer(Status), is_list(Headers), is_binary(Body) +-> + %% This is unlikely anything else then info about a HTTP request so we make + %% it more structured + prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode); prepare_key_value(payload = K, V, PEncode) -> NewV = try diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 9be7457e1..76f2686a1 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -359,7 +359,7 @@ on_query(InstId, {Method, Request, Timeout}, State) -> on_query( InstId, {ActionId, KeyOrNum, Method, Request, Timeout, Retry}, - #{base_path := BasePath} = State + #{base_path := BasePath, host := Host} = State ) -> ?TRACE( "QUERY", @@ -373,7 +373,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -469,7 +469,7 @@ on_query_async( InstId, {ActionId, KeyOrNum, Method, Request, Timeout}, ReplyFunAndArgs, - #{base_path := BasePath} = State + #{base_path := BasePath, host := Host} = State ) -> Worker = resolve_pool_worker(State, KeyOrNum), ?TRACE( @@ -483,7 +483,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), - trace_rendered_action_template(ActionId, Method, NRequest, Timeout), + trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -503,12 +503,13 @@ on_query_async( ), {ok, Worker}. -trace_rendered_action_template(ActionId, Method, NRequest, Timeout) -> +trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> case NRequest of {Path, Headers} -> emqx_trace:rendered_action_template( ActionId, #{ + host => Host, path => Path, method => Method, headers => emqx_utils_redact:redact_headers(Headers), @@ -519,15 +520,19 @@ trace_rendered_action_template(ActionId, Method, NRequest, Timeout) -> emqx_trace:rendered_action_template( ActionId, #{ + host => Host, path => Path, method => Method, headers => emqx_utils_redact:redact_headers(Headers), timeout => Timeout, - body => Body + body => {fun log_format_body/1, Body} } ) end. +log_format_body(Body) -> + unicode:characters_to_binary(Body). + resolve_pool_worker(State, undefined) -> resolve_pool_worker(State, self()); resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> From 1f676ce035e8be9b731ce7d7f35f0370341436be Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 10:36:05 +0200 Subject: [PATCH 02/27] feat: add stop after render and after render trace to mqtt action --- .../src/emqx_bridge_mqtt_connector.erl | 29 ++++++++-- .../src/emqx_bridge_mqtt_egress.erl | 55 ++++++++++++++----- 2 files changed, 64 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 900f6143f..d61950513 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -264,7 +264,7 @@ on_query( ), Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig])); + handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig])); on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", @@ -283,7 +283,7 @@ on_query_async( Callback = {fun on_async_result/2, [CallbackIn]}, Channels = maps:get(installed_channels, State), ChannelConfig = maps:get(ChannelId, Channels), - Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]), + Result = with_egress_client(ChannelId, PoolName, send_async, [Msg, Callback, ChannelConfig]), case Result of ok -> ok; @@ -300,8 +300,25 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> reason => "Egress is not configured" }). -with_egress_client(ResourceId, Fun, Args) -> - ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). +with_egress_client(ActionID, ResourceId, Fun, Args) -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + ecpool:pick_and_do( + ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover + ). + +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = logger:get_process_metadata(), + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -322,7 +339,9 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> handle_send_result({ok, Reply}) -> {error, classify_reply(Reply)}; handle_send_result({error, Reason}) -> - {error, classify_error(Reason)}. + {error, classify_error(Reason)}; +handle_send_result({unrecoverable_error, Reason}) -> + {error, {unrecoverable_error, Reason}}. classify_reply(Reply = #{reason_code := _}) -> {unrecoverable_error, Reply}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index d23899ef1..80d38bc78 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -22,13 +22,16 @@ -export([ config/1, - send/3, - send_async/4 + send/4, + send_async/5 ]). -type message() :: emqx_types:message() | map(). -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type remote_message() :: #mqtt_msg{}. +-type trace_rendered_func() :: { + fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map() +}. -type egress() :: #{ local => #{ @@ -42,25 +45,37 @@ config(#{remote := RC = #{}} = Conf) -> Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. --spec send(pid(), message(), egress()) -> ok. -send(Pid, MsgIn, Egress) -> - emqtt:publish(Pid, export_msg(MsgIn, Egress)). +-spec send(pid(), trace_rendered_func(), message(), egress()) -> ok. +send(Pid, TraceRenderedFunc, MsgIn, Egress) -> + try + emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc)) + catch + error:{unrecoverable_error, Reason} -> + {unrecoverable_error, Reason} + end. --spec send_async(pid(), message(), callback(), egress()) -> +-spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) -> ok | {ok, pid()}. -send_async(Pid, MsgIn, Callback, Egress) -> - ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), - {ok, Pid}. +send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) -> + try + ok = emqtt:publish_async( + Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback + ), + {ok, Pid} + catch + error:{unrecoverable_error, Reason} -> + {unrecoverable_error, Reason} + end. -export_msg(Msg, #{remote := Remote}) -> - to_remote_msg(Msg, Remote). +export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) -> + to_remote_msg(Msg, Remote, TraceRenderedFunc). --spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> +-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) -> remote_message(). -to_remote_msg(#message{flags = Flags} = Msg, Vars) -> +to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); -to_remote_msg(Msg = #{}, Remote) -> + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc); +to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> #{ topic := Topic, payload := Payload, @@ -68,6 +83,16 @@ to_remote_msg(Msg = #{}, Remote) -> retain := Retain } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), + TraceRenderedFun( + #{ + qos => QoS, + retain => Retain, + topic => Topic, + props => PubProps, + payload => Payload + }, + TraceRenderedCTX + ), #mqtt_msg{ qos = QoS, retain = Retain, From b02ed4e6ec0fe97c4bc04186d663560ffc113648 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 11:09:08 +0200 Subject: [PATCH 03/27] feat: add stop after render and after render trace to pgsql action --- apps/emqx/src/emqx_trace/emqx_trace.erl | 7 +++++-- apps/emqx_postgresql/src/emqx_postgresql.erl | 14 +++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 7bbe59b2b..4152fdbaa 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -87,7 +87,7 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). -rendered_action_template(ActionID, RenderResult) -> +rendered_action_template(ActionID, RenderResult) when is_binary(ActionID) -> TraceResult = ?TRACE( "QUERY_RENDER", "action_template_rendered", @@ -111,7 +111,10 @@ rendered_action_template(ActionID, RenderResult) -> _ -> ok end, - TraceResult. + TraceResult; +rendered_action_template(_ActionID, _RenderResult) -> + %% We do nothing if we don't get a valid Action ID + ok. log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index f27ec8615..761c9c0f6 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -304,7 +304,7 @@ on_query( }), Type = pgsql_query_type(TypeOrKey), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data), + Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). @@ -337,7 +337,7 @@ on_batch_query( {_Statement, RowTemplate} -> PrepStatement = get_prepared_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], - case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of + case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of {error, _Error} = Result -> handle_result(Result); {_Column, Results} -> @@ -386,7 +386,15 @@ get_prepared_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). -on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> +on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) -> + emqx_trace:rendered_action_template( + Key, + #{ + statement_type => Type, + statement_or_name => NameOrSQL, + data => Data + } + ), try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of {error, Reason} -> ?tp( From 32c27f171140f878810b696e8db2e8491b44b365 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 12:16:18 +0200 Subject: [PATCH 04/27] feat: add stop after render and after render trace to kafka action --- .../src/emqx_bridge_kafka_impl_producer.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6bb1690ff..16bca153a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -319,6 +319,9 @@ on_query( emqx_bridge_kafka_impl_producer_sync_query, #{headers_config => KafkaHeaders, instance_id => InstId} ), + emqx_trace:rendered_action_template(MessageTag, #{ + message => KafkaMessage, send_type => sync + }), do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) catch throw:{bad_kafka_header, _} = Error -> @@ -376,6 +379,9 @@ on_query_async( emqx_bridge_kafka_impl_producer_async_query, #{headers_config => KafkaHeaders, instance_id => InstId} ), + emqx_trace:rendered_action_template(MessageTag, #{ + message => KafkaMessage, send_type => async + }), do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) catch error:{invalid_partition_count, _Count, _Partitioner} -> From 7ad354f41210aeae82106f45d6ebb9d0d5603495 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 12:18:28 +0200 Subject: [PATCH 05/27] feat: add stop after render and after render trace to clickhouse action --- .../src/emqx_bridge_clickhouse_connector.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 942f7590b..2c824aa95 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -386,7 +386,7 @@ on_query( SimplifiedRequestType = query_type(RequestType), Templates = get_templates(RequestType, State), SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL), - ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL), + ClickhouseResult = execute_sql_in_clickhouse_server(RequestType, PoolName, SQL), transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). get_templates(ChannId, State) -> @@ -398,7 +398,7 @@ get_templates(ChannId, State) -> end. get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) -> - emqx_placeholder:proc_tmpl(PreparedSQL, Data); + emqx_placeholder:proc_tmpl(PreparedSQL, Data, #{return => full_binary}); get_sql(_, _, SQL) -> SQL. @@ -425,7 +425,7 @@ on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) -> %% Create batch insert SQL statement SQL = objects_to_sql(ObjectsToInsert, Templates), %% Do the actual query in the database - ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL), + ResultFromClickhouse = execute_sql_in_clickhouse_server(ChannId, PoolName, SQL), %% Transform the result to a better format transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL). @@ -464,7 +464,8 @@ objects_to_sql(_, _) -> %% This function is used by on_query/3 and on_batch_query/3 to send a query to %% the database server and receive a result -execute_sql_in_clickhouse_server(PoolName, SQL) -> +execute_sql_in_clickhouse_server(Id, PoolName, SQL) -> + emqx_trace:rendered_action_template(Id, #{rendered_sql => SQL}), ecpool:pick_and_do( PoolName, {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]}, From b2811f96b252c001bdfdf006ff3bd176993425a1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 15:27:16 +0200 Subject: [PATCH 06/27] refactor(rule trace): simplify function for setting trace meta data This commit simplifies a function to set trace meta data in line with a suggestion from @zmstone: https://github.com/emqx/emqx/pull/12912#discussion_r1576053856 --- .../src/emqx_rule_runtime.erl | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f99341a9b..e2f01321a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -141,22 +141,24 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> logger:update_process_metadata(#{ - clientid => ClientID - }), - set_process_trace_metadata(RuleID, maps:remove(clientid, Columns)); + clientid => ClientID, + rule_id => RuleID, + rule_trigger_time => rule_trigger_time(Columns) + }); set_process_trace_metadata(RuleID, Columns) -> - EventTimestamp = - case Columns of - #{timestamp := Timestamp} -> - Timestamp; - _ -> - erlang:system_time(millisecond) - end, logger:update_process_metadata(#{ rule_id => RuleID, - rule_trigger_time => EventTimestamp + rule_trigger_time => rule_trigger_time(Columns) }). +rule_trigger_time(Columns) -> + case Columns of + #{timestamp := Timestamp} -> + Timestamp; + _ -> + erlang:system_time(millisecond) + end. + reset_process_trace_metadata(#{clientid := _ClientID}) -> Meta = logger:get_process_metadata(), Meta1 = maps:remove(clientid, Meta), From 120b35ac7521c34181915633f4b6d5e462429fe0 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 15:54:46 +0200 Subject: [PATCH 07/27] feat: add stop after render and after render trace to mysql action --- .../emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl | 8 ++++++-- apps/emqx_mysql/src/emqx_mysql.app.src | 2 +- apps/emqx_mysql/src/emqx_mysql.erl | 2 ++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl index 6720e1fb7..da9377814 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -104,10 +104,12 @@ on_query( #{channels := Channels, connector_state := ConnectorState} ) when is_binary(Channel) -> ChannelConfig = maps:get(Channel, Channels), + MergedState0 = maps:merge(ConnectorState, ChannelConfig), + MergedState1 = MergedState0#{channel_id => Channel}, Result = emqx_mysql:on_query( InstanceId, Request, - maps:merge(ConnectorState, ChannelConfig) + MergedState1 ), ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}), Result; @@ -121,10 +123,12 @@ on_batch_query( ) when is_binary(element(1, Req)) -> Channel = element(1, Req), ChannelConfig = maps:get(Channel, Channels), + MergedState0 = maps:merge(ConnectorState, ChannelConfig), + MergedState1 = MergedState0#{channel_id => Channel}, Result = emqx_mysql:on_batch_query( InstanceId, BatchRequest, - maps:merge(ConnectorState, ChannelConfig) + MergedState1 ), ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}), Result; diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src index f23d7b092..9637cc473 100644 --- a/apps/emqx_mysql/src/emqx_mysql.app.src +++ b/apps/emqx_mysql/src/emqx_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_mysql, [ {description, "EMQX MySQL Database Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index e77965d67..ff851558a 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -498,6 +498,8 @@ on_sql_query( ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), + ChannelID = maps:get(channel_id, State, no_channel), + emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}), Worker = ecpool:get_client(PoolName), case ecpool_worker:client(Worker) of {ok, Conn} -> From 810aa68b02ed1a3ca1646b28b20e74e4676532a9 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 17:58:55 +0200 Subject: [PATCH 08/27] feat: add stop after render and after render trace to dynamo action --- .../src/emqx_bridge_dynamo_connector.erl | 25 ++++++++++++++++++- .../emqx_bridge_dynamo_connector_client.erl | 17 +++++++------ .../src/emqx_bridge_mqtt_connector.erl | 6 ++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 372472dda..f9a87ccf7 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -246,12 +246,17 @@ do_query( table := Table, templates := Templates } = ChannelState, + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, Result = case ensuare_dynamo_keys(Query, ChannelState) of true -> ecpool:pick_and_do( PoolName, - {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]}, + {emqx_bridge_dynamo_connector_client, query, [ + Table, QueryTuple, Templates, TraceRenderedFunc + ]}, no_handover ); _ -> @@ -259,6 +264,8 @@ do_query( end, case Result of + {error, {unrecoverable_error, {action_stopped_after_template_rendering, _}}} = Error -> + Error; {error, Reason} -> ?tp( dynamo_connector_query_return, @@ -291,6 +298,22 @@ do_query( Result end. +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + get_channel_id([{ChannelId, _Req} | _]) -> ChannelId; get_channel_id({ChannelId, _Req}) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 4f924ef67..1d1ad3760 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -10,7 +10,7 @@ -export([ start_link/1, is_connected/2, - query/4 + query/5 ]). %% gen_server callbacks @@ -40,8 +40,8 @@ is_connected(Pid, Timeout) -> {false, Error} end. -query(Pid, Table, Query, Templates) -> - gen_server:call(Pid, {query, Table, Query, Templates}, infinity). +query(Pid, Table, Query, Templates, TraceRenderedFunc) -> + gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedFunc}, infinity). %%-------------------------------------------------------------------- %% @doc @@ -77,14 +77,14 @@ handle_call(is_connected, _From, State) -> {false, Error} end, {reply, IsConnected, State}; -handle_call({query, Table, Query, Templates}, _From, State) -> - Result = do_query(Table, Query, Templates), +handle_call({query, Table, Query, Templates, TraceRenderedFunc}, _From, State) -> + Result = do_query(Table, Query, Templates, TraceRenderedFunc), {reply, Result, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) -> - Result = do_query(Table, Query, Templates), + Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}), ReplyFun(Context, Result), {noreply, State}; handle_cast(_Request, State) -> @@ -102,11 +102,14 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -do_query(Table, Query0, Templates) -> +do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) -> try Query = apply_template(Query0, Templates), + TraceRenderedFun(#{table => Table, query => Query}, TraceRenderedCTX), execute(Query, Table) catch + error:{unrecoverable_error, Reason} -> + {error, {unrecoverable_error, Reason}}; _Type:Reason -> {error, {unrecoverable_error, {invalid_request, Reason}}} end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index d61950513..d3ffd6c92 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -309,7 +309,11 @@ with_egress_client(ActionID, ResourceId, Fun, Args) -> ). trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = logger:get_process_metadata(), + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, try logger:set_process_metadata(LogMetaData), emqx_trace:rendered_action_template( From a2dd8f5aee920bc7bb5878d1b705cf17243a1105 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 Apr 2024 18:25:53 +0200 Subject: [PATCH 09/27] feat: add stop after render and after render trace to cassandra action --- apps/emqx/src/emqx_trace/emqx_trace.erl | 2 +- .../src/emqx_bridge_cassandra_connector.erl | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 4152fdbaa..329e5f696 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -87,7 +87,7 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). -rendered_action_template(ActionID, RenderResult) when is_binary(ActionID) -> +rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> TraceResult = ?TRACE( "QUERY_RENDER", "action_template_rendered", diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index eb12cbaae..ef79f78fe 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -223,6 +223,11 @@ do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) -> } ), {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State), + emqx_trace:rendered_action_template(PreparedKeyOrCQL, #{ + type => Type, + key_or_cql => PreparedKeyOrCQL1, + data => Data + }), Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data), handle_result(Res). @@ -261,6 +266,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) -> state => State } ), + ChannelID = + case Requests of + [{CID, _} | _] -> CID; + _ -> none + end, + emqx_trace:rendered_action_template(ChannelID, #{ + cqls => CQLs + }), Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), handle_result(Res). From 7922d5d4220e5cd379633724a1c3b9169dca7ad6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 10:36:24 +0200 Subject: [PATCH 10/27] feat: add stop after render and after render trace to gcp action --- .../src/emqx_bridge_gcp_pubsub_impl_producer.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 13040dccf..12d5d1f2f 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -284,6 +284,13 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) -> Method = post, ReqOpts = #{request_ttl => RequestTTL}, Request = {prepared_request, {Method, Path, Body}, ReqOpts}, + emqx_trace:rendered_action_template(MessageTag, #{ + method => Method, + path => Path, + body => Body, + options => ReqOpts, + is_async => false + }), Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), QueryMode = sync, handle_result(Result, Request, QueryMode, InstanceId). @@ -312,6 +319,13 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> ReqOpts = #{request_ttl => RequestTTL}, Request = {prepared_request, {Method, Path, Body}, ReqOpts}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, + emqx_trace:rendered_action_template(MessageTag, #{ + method => Method, + path => Path, + body => Body, + options => ReqOpts, + is_async => true + }), emqx_bridge_gcp_pubsub_client:query_async( Request, ReplyFunAndArgs, Client ). From 7c7590fbc8a9f780e09c0eb633a7f1acad3146dd Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 10:37:52 +0200 Subject: [PATCH 11/27] feat: add stop after render and after render trace to greptimedb action --- .../src/emqx_bridge_greptimedb_connector.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 6bdd8a4cd..97eedf3f6 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -128,7 +128,7 @@ on_query(InstId, {Channel, Message}, State) -> greptimedb_connector_send_query, #{points => Points, batch => false, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, ErrorPoints} -> ?tp( greptimedb_connector_send_query_error, @@ -152,7 +152,7 @@ on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) -> greptimedb_connector_send_query, #{points => Points, batch => true, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, Reason} -> ?tp( greptimedb_connector_send_query_error, @@ -173,7 +173,7 @@ on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) -> greptimedb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( greptimedb_connector_send_query_error, @@ -195,7 +195,7 @@ on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, S greptimedb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( greptimedb_connector_send_query_error, @@ -420,7 +420,8 @@ is_auth_key(_) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, Client, Points) -> +do_query(InstId, Channel, Client, Points) -> + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}), case greptimedb:write_batch(Client, Points) of {ok, #{response := {affected_rows, #{value := Rows}}}} -> ?SLOG(debug, #{ @@ -452,12 +453,13 @@ do_query(InstId, Client, Points) -> end end. -do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> +do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "greptimedb_write_point_async", connector => InstId, points => Points }), + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}), WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs). From 9d6655bc30639bec7a3d554b17650d5b851aa2ef Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 13:53:01 +0200 Subject: [PATCH 12/27] feat: add stop after render and after render trace to hstreamdb action --- .../src/emqx_bridge_hstreamdb_connector.erl | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf195fc9f..cf53291b2 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -134,8 +134,11 @@ on_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(InstId, Producer, Record, false) + try + KeyAndRawRecord = to_key_and_raw_record(PartitionKey, HRecordTemplate, Data), + emqx_trace:rendered_action_template(ChannelID, #{record => KeyAndRawRecord}), + Record = to_record(KeyAndRawRecord), + append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -148,8 +151,13 @@ on_batch_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(InstId, Producer, Records, true) + try + KeyAndRawRecordList = to_multi_part_key_and_partition_key( + PartitionKey, HRecordTemplate, BatchList + ), + emqx_trace:rendered_action_template(ChannelID, #{records => KeyAndRawRecordList}), + Records = [to_record(Item) || Item <- KeyAndRawRecordList], + append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -348,20 +356,20 @@ ensure_start_producer(ProducerName, ProducerOptions) -> produce_name(ActionId) -> list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). -to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> +to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data), - to_record(PartitionKey, RawRecord). + #{partition_key => PartitionKey, raw_record => RawRecord}. -to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) -> - to_record(binary_to_list(PartitionKey), RawRecord); -to_record(PartitionKey, RawRecord) -> +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) when is_binary(PartitionKey) -> + to_record(#{partition_key => binary_to_list(PartitionKey), raw_record => RawRecord}); +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) -> hstreamdb:to_record(PartitionKey, raw, RawRecord). -to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> +to_multi_part_key_and_partition_key(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( fun({_, Data}) -> - to_record(PartitionKeyTmpl, HRecordTmpl, Data) + to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ). From e2b35ea2425d3d39575d4a4c232960987f68807b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 13:53:58 +0200 Subject: [PATCH 13/27] feat: add stop after render and after render trace to opents action --- .../emqx_bridge_opents/src/emqx_bridge_opents_connector.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index abef958ff..509d53284 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -167,9 +167,10 @@ on_batch_query( BatchReq, #{channels := Channels} = State ) -> + [{ChannelId, _} | _] = BatchReq, case try_render_messages(BatchReq, Channels) of {ok, Datas} -> - do_query(InstanceId, Datas, State); + do_query(InstanceId, ChannelId, Datas, State); Error -> Error end. @@ -222,12 +223,13 @@ on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) -> %% Helper fns %%======================================================================================== -do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> +do_query(InstanceId, ChannelID, Query, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", "opents_connector_received", #{connector => InstanceId, query => Query, state => State} ), + emqx_trace:rendered_action_template(ChannelID, #{query => Query}), ?tp(opents_bridge_on_query, #{instance_id => InstanceId}), From beedc72be47d5cdee9b28871f79608c20120cdcb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 13:54:52 +0200 Subject: [PATCH 14/27] feat: add stop after render and after render trace to mongodb action --- .../src/emqx_bridge_mongodb_connector.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index a0d53d454..69c2242e4 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -66,10 +66,15 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat payload_template := PayloadTemplate, collection_template := CollectionTemplate } = ChannelState0 = maps:get(Channel, Channels), + Collection = emqx_placeholder:proc_tmpl(CollectionTemplate, Message0), ChannelState = ChannelState0#{ - collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0) + collection => Collection }, Message = render_message(PayloadTemplate, Message0), + emqx_trace:rendered_action_template(Channel, #{ + collection => Collection, + data => Message + }), Res = emqx_mongodb:on_query( InstanceId, {Channel, Message}, From 279ad186f796bfb899a2b377049b653f77be9b87 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 13:55:43 +0200 Subject: [PATCH 15/27] feat: add stop after render and after render trace to kinesis action --- .../src/emqx_bridge_kinesis_impl_producer.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index c8a522e01..8744dfd71 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -261,6 +261,11 @@ do_send_requests_sync( stream_name := StreamName } = maps:get(ChannelId, InstalledChannels), Records = render_records(Requests, Templates), + StructuredRecords = [ + #{data => Data, partition_key => PartitionKey} + || {Data, PartitionKey} <- Records + ], + emqx_trace:rendered_action_template(ChannelId, StructuredRecords), Result = ecpool:pick_and_do( PoolName, {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]}, From d27f05fa604e131ba1e3ba4400775b4bd0b91ce0 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 13:56:41 +0200 Subject: [PATCH 16/27] feat: add stop after render and after render trace to influxdb action --- .../src/emqx_bridge_influxdb_connector.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 94419c7d9..f239d3735 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -130,7 +130,7 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) -> influxdb_connector_send_query, #{points => Points, batch => false, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, ErrorPoints} -> ?tp( influxdb_connector_send_query_error, @@ -152,7 +152,7 @@ on_batch_query(InstId, BatchData, #{channels := ChannelConf}) -> influxdb_connector_send_query, #{points => Points, batch => true, mode => sync} ), - do_query(InstId, Client, Points); + do_query(InstId, Channel, Client, Points); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -175,7 +175,7 @@ on_query_async( influxdb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( influxdb_connector_send_query_error, @@ -200,7 +200,7 @@ on_batch_query_async( influxdb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplyFun, Args}); + do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -496,7 +496,8 @@ is_auth_key(_) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, Client, Points) -> +do_query(InstId, Channel, Client, Points) -> + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}), case influxdb:write(Client, Points) of ok -> ?SLOG(debug, #{ @@ -527,12 +528,13 @@ do_query(InstId, Client, Points) -> end end. -do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> +do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "influxdb_write_point_async", connector => InstId, points => Points }), + emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}), WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs). From 9c37c99b623d4562eeec4cb37d8f1308906f1191 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 14:11:04 +0200 Subject: [PATCH 17/27] feat: add stop after render and after render trace to oracle action --- apps/emqx_oracle/src/emqx_oracle.erl | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 4e88a3d96..e90665cc4 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -210,7 +210,7 @@ on_query( }), Type = query, {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), + Res = on_sql_query(InstId, TypeOrKey, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), handle_result(Res). on_batch_query( @@ -244,7 +244,9 @@ on_batch_query( Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case - on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) + on_sql_query( + InstId, BinKey, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2 + ) of {ok, Results} -> handle_batch_result(Results, 0); @@ -281,7 +283,13 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{ end end. -on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> +on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) -> + emqx_trace:rendered_action_template(ChannelID, #{ + type => Type, + apply_mode => ApplyMode, + name_or_sql => NameOrSQL, + data => Data + }), case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of {error, Reason} = Result -> ?tp( From 74fac80e7e056cafa30a4161b7ce3f3dd815b8fe Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 14:24:52 +0200 Subject: [PATCH 18/27] feat: add stop after render and after render trace to pulsar action --- .../src/emqx_bridge_pulsar_connector.erl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 8c39c3671..0cddfab66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -196,6 +196,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> {error, channel_not_found}; {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + sync_timeout => SyncTimeout, + is_async => false + }), try pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) catch @@ -217,12 +222,16 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> ?tp_span( pulsar_producer_on_query_async, #{instance_id => _InstanceId, message => Message}, - on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) + on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. -on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) -> +on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> PulsarMessage = render_message(Message, MessageTmpl), + emqx_trace:rendered_action_template(ChannelId, #{ + message => PulsarMessage, + is_async => true + }), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). %%------------------------------------------------------------------------------------- From 2abc1b1141fb7f480333954be1c166e5aa9ee8bf Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 14:48:23 +0200 Subject: [PATCH 19/27] feat: add stop after render and after render trace to redis action --- .../src/emqx_bridge_redis_connector.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index b7477b385..3f7c4897c 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -107,7 +107,7 @@ on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) -> Result; on_query( InstId, - {_MessageTag, _Data} = Msg, + {MessageTag, _Data} = Msg, #{channels := Channels, conn_st := RedisConnSt} ) -> case try_render_message([Msg], Channels) of @@ -116,6 +116,10 @@ on_query( redis_bridge_connector_cmd, #{cmd => Cmd, batch => false, mode => sync} ), + emqx_trace:rendered_action_template( + MessageTag, + #{command => Cmd, batch => false, mode => sync} + ), Result = query(InstId, {cmd, Cmd}, RedisConnSt), ?tp( redis_bridge_connector_send_done, @@ -135,6 +139,11 @@ on_batch_query( redis_bridge_connector_send, #{batch_data => BatchData, batch => true, mode => sync} ), + [{ChannelID, _} | _] = BatchData, + emqx_trace:rendered_action_template( + ChannelID, + #{commands => Cmds, batch => ture, mode => sync} + ), Result = query(InstId, {cmds, Cmds}, RedisConnSt), ?tp( redis_bridge_connector_send_done, From 22c7224267edf838efb6f78950fd67571e52facc Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 15:09:05 +0200 Subject: [PATCH 20/27] feat: add stop after render and after render trace to rocketmq action --- .../src/emqx_bridge_rocketmq_connector.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index f9b4ec5d4..314afb350 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -264,7 +264,11 @@ do_query( TopicKey = get_topic_key(Query, TopicTks), Data = apply_template(Query, Templates, DispatchStrategy), - + emqx_trace:rendered_action_template(ChannelId, #{ + topic_key => TopicKey, + data => Data, + request_timeout => RequestTimeout + }), Result = safe_do_produce( ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout ), From 03e3ac19a98ebe2a3f7c8c38506ec2fbfe7cfb27 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 15:21:33 +0200 Subject: [PATCH 21/27] feat: add stop after render and after render trace to s3 action --- apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 4407222d5..5d3ed19f8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -168,13 +168,14 @@ init_channel_state(#{parameters := Parameters}) -> on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) -> case maps:get(Tag, Channels, undefined) of ChannelState = #{} -> - run_simple_upload(InstId, Data, ChannelState, Config); + run_simple_upload(InstId, Tag, Data, ChannelState, Config); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. run_simple_upload( InstId, + ChannelID, Data, #{ bucket := BucketTemplate, @@ -188,6 +189,11 @@ run_simple_upload( Client = emqx_s3_client:create(Bucket, Config), Key = render_key(KeyTemplate, Data), Content = render_content(ContentTemplate, Data), + emqx_trace:rendered_action_template(ChannelID, #{ + bucket => Bucket, + key => Key, + content => Content + }), case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of ok -> ?tp(s3_bridge_connector_upload_ok, #{ From a0b2357abbf82fa956ce454c2ccff51d227147c3 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 15:30:15 +0200 Subject: [PATCH 22/27] feat: add stop after render and after render trace to sqlserver action --- .../src/emqx_bridge_sqlserver_connector.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 1eb9746dc..683551316 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -413,6 +413,9 @@ do_query( %% only insert sql statement for single query and batch query case apply_template(QueryTuple, Templates) of {?ACTION_SEND_MESSAGE, SQL} -> + emqx_trace:rendered_action_template(ChannelId, #{ + sql => SQL + }), Result = ecpool:pick_and_do( PoolName, {?MODULE, worker_do_insert, [SQL, State]}, From 11d9d30fc00a1058276fb328042375420d245842 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 15:51:52 +0200 Subject: [PATCH 23/27] feat: add stop after render and after render trace to syskeeper action --- .../src/emqx_bridge_syskeeper_connector.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 045af348f..a6d47229c 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -273,6 +273,8 @@ do_query( Result = case try_render_message(Query, Channels) of {ok, Msg} -> + [{ChannelID, _} | _] = Query, + emqx_trace:rendered_action_template(ChannelID, #{message => Msg}), ecpool:pick_and_do( PoolName, {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]}, From d6ceeb3b30f099378525cd6dca3d813dde4b4c55 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 16:38:35 +0200 Subject: [PATCH 24/27] feat: add stop after render and after render trace to rabbitmq action --- .../src/emqx_bridge_rabbitmq_connector.erl | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 1ef1c6617..1637743b5 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -41,7 +41,7 @@ -export([connect/1]). %% Internal callbacks --export([publish_messages/4]). +-export([publish_messages/5]). namespace() -> "rabbitmq". @@ -214,9 +214,12 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]}, no_handover ), handle_result(Res); @@ -234,9 +237,12 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]}, no_handover ), handle_result(Res); @@ -244,6 +250,22 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} end. +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + publish_messages( Conn, RabbitMQ, @@ -255,7 +277,8 @@ publish_messages( wait_for_publish_confirmations := WaitForPublishConfirmations, publish_confirmation_timeout := PublishConfirmationTimeout }, - Messages + Messages, + TraceRenderedFunc ) -> try publish_messages( @@ -267,15 +290,18 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + TraceRenderedFunc ) catch + error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + {error, Reason}; %% if send a message to a non-existent exchange, RabbitMQ client will crash %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} %% so we catch and return {recoverable_error, Reason} to increase metrics _Type:Reason -> Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])), - erlang:error({recoverable_error, Msg}) + {error, {recoverable_error, Msg}} end. publish_messages( @@ -287,7 +313,8 @@ publish_messages( PayloadTmpl, Messages, WaitForPublishConfirmations, - PublishConfirmationTimeout + PublishConfirmationTimeout, + {TraceRenderedFun, TraceRenderedFuncCTX} ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> @@ -299,18 +326,36 @@ publish_messages( exchange = Exchange, routing_key = RoutingKey }, + FormattedMsgs = [ + format_data(PayloadTmpl, M) + || {_, M} <- Messages + ], + TraceRenderedFun( + #{ + messages => FormattedMsgs, + properties => #{ + headers => [], + delivery_mode => DeliveryMode + }, + method => #{ + exchange => Exchange, + routing_key => RoutingKey + } + }, + TraceRenderedFuncCTX + ), lists:foreach( - fun({_, MsgRaw}) -> + fun(Msg) -> amqp_channel:cast( Channel, Method, #amqp_msg{ - payload = format_data(PayloadTmpl, MsgRaw), + payload = Msg, props = MessageProperties } ) end, - Messages + FormattedMsgs ), case WaitForPublishConfirmations of true -> From 0dbaef431671ada97cc4a9030a8a3e42e791a7fa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 24 Apr 2024 17:03:04 +0200 Subject: [PATCH 25/27] feat: add stop after render and after render trace to tdengine action --- .../src/emqx_bridge_tdengine_connector.erl | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 7bb342ed1..383ceabf7 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -32,7 +32,7 @@ -export([connector_examples/1]). --export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). +-export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -186,6 +186,7 @@ on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) -> case maps:find(ChannelId, Channels) of {ok, #{insert := Tokens, opts := Opts}} -> Query = emqx_placeholder:proc_tmpl(Tokens, Data), + emqx_trace:rendered_action_template(ChannelId, #{query => Query}), do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State); _ -> {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}} @@ -199,9 +200,12 @@ on_batch_query( ) -> case maps:find(ChannelId, Channels) of {ok, #{batch := Tokens, opts := Opts}} -> + LogMetaData = logger:get_process_metadata(), + TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, + TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, do_query_job( InstanceId, - {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]}, State ); _ -> @@ -212,6 +216,22 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. +trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( @@ -338,9 +358,15 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). -do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> +do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) -> SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), - execute(Conn, SQL, Opts). + try + TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX), + execute(Conn, SQL, Opts) + catch + error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + {error, Reason} + end. aggregate_query(BatchTks, BatchReqs, Acc) -> lists:foldl( From ef9884cf47727ef334af65f469779f6345a325d2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 25 Apr 2024 11:39:20 +0200 Subject: [PATCH 26/27] refactor(rule trace): templates rendered trace to increase code reuse * The code for passing the trace context to a sub process has been improved to increase code reuse. This code is used when the action templates are rendered in a sub process. * A macro has also been added for the error term that is thrown when the action shall be stopped after the templates has been rendered. This is also done to reduce code duplication and to reduce the risk of introducing bugs due to typos. * Fix incorrect type spec Thanks to @zmstone for suggesting these improvements in comments to a PR (https://github.com/emqx/emqx/pull/12916). --- apps/emqx/include/emqx_trace.hrl | 6 ++ apps/emqx/src/emqx_trace/emqx_trace.erl | 55 ++++++++++++++++- .../src/emqx_bridge_dynamo_connector.erl | 26 ++------ .../emqx_bridge_dynamo_connector_client.erl | 23 +++++-- .../src/emqx_bridge_mqtt_connector.erl | 28 ++------- .../src/emqx_bridge_mqtt_egress.erl | 53 ++++++++-------- .../src/emqx_bridge_rabbitmq_connector.erl | 60 ++++++------------- .../src/emqx_bridge_tdengine_connector.erl | 32 +++------- .../src/emqx_rule_runtime.erl | 3 +- 9 files changed, 140 insertions(+), 146 deletions(-) diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index d1e70b184..27dd8b6c8 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -38,4 +38,10 @@ -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). +-define(EMQX_TRACE_STOP_ACTION(REASON), + {unrecoverable_error, {action_stopped_after_template_rendering, REASON}} +). + +-define(EMQX_TRACE_STOP_ACTION_MATCH, ?EMQX_TRACE_STOP_ACTION(_)). + -endif. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 329e5f696..91de65b39 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -29,7 +29,9 @@ unsubscribe/2, log/3, log/4, - rendered_action_template/2 + rendered_action_template/2, + make_rendered_action_template_trace_context/1, + rendered_action_template_with_ctx/2 ]). -export([ @@ -70,6 +72,12 @@ -export_type([ruleid/0]). -type ruleid() :: binary(). +-export_type([rendered_action_template_ctx/0]). +-opaque rendered_action_template_ctx() :: #{ + trace_ctx := map(), + action_id := any() +}. + publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -107,15 +115,56 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) -> ) ), MsgBin = unicode:characters_to_binary(StopMsg), - error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}}); + error(?EMQX_TRACE_STOP_ACTION(MsgBin)); _ -> ok end, TraceResult; rendered_action_template(_ActionID, _RenderResult) -> - %% We do nothing if we don't get a valid Action ID + %% We do nothing if we don't get a valid Action ID. This can happen when + %% called from connectors that are used for actions as well as authz and + %% authn. ok. +%% The following two functions are used for connectors that don't do the +%% rendering in the main process (the one that called on_*query). In this case +%% we need to pass the trace context to the sub process that do the rendering +%% so that the result of the rendering can be traced correctly. It is also +%% important to ensure that the error that can be thrown from +%% rendered_action_template_with_ctx is handled in the appropriate way in the +%% sub process. +-spec make_rendered_action_template_trace_context(any()) -> rendered_action_template_ctx(). +make_rendered_action_template_trace_context(ActionID) -> + MetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + #{trace_ctx => MetaData, action_id => ActionID}. + +-spec rendered_action_template_with_ctx(rendered_action_template_ctx(), Result :: term()) -> term(). +rendered_action_template_with_ctx( + #{ + trace_ctx := LogMetaData, + action_id := ActionID + }, + RenderResult +) -> + OldMetaData = + case logger:get_process_metadata() of + undefined -> #{}; + M -> M + end, + try + logger:set_process_metadata(LogMetaData), + emqx_trace:rendered_action_template( + ActionID, + RenderResult + ) + after + logger:set_process_metadata(OldMetaData) + end. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index f9a87ccf7..598b3342d 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -246,16 +247,15 @@ do_query( table := Table, templates := Templates } = ChannelState, - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = + emqx_trace:make_rendered_action_template_trace_context(ChannelId), Result = case ensuare_dynamo_keys(Query, ChannelState) of true -> ecpool:pick_and_do( PoolName, {emqx_bridge_dynamo_connector_client, query, [ - Table, QueryTuple, Templates, TraceRenderedFunc + Table, QueryTuple, Templates, TraceRenderedCTX ]}, no_handover ); @@ -264,7 +264,7 @@ do_query( end, case Result of - {error, {unrecoverable_error, {action_stopped_after_template_rendering, _}}} = Error -> + {error, ?EMQX_TRACE_STOP_ACTION(_)} = Error -> Error; {error, Reason} -> ?tp( @@ -298,22 +298,6 @@ do_query( Result end. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - get_channel_id([{ChannelId, _Req} | _]) -> ChannelId; get_channel_id({ChannelId, _Req}) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index 1d1ad3760..f257ae389 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -40,8 +40,8 @@ is_connected(Pid, Timeout) -> {false, Error} end. -query(Pid, Table, Query, Templates, TraceRenderedFunc) -> - gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedFunc}, infinity). +query(Pid, Table, Query, Templates, TraceRenderedCTX) -> + gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity). %%-------------------------------------------------------------------- %% @doc @@ -77,8 +77,8 @@ handle_call(is_connected, _From, State) -> {false, Error} end, {reply, IsConnected, State}; -handle_call({query, Table, Query, Templates, TraceRenderedFunc}, _From, State) -> - Result = do_query(Table, Query, Templates, TraceRenderedFunc), +handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) -> + Result = do_query(Table, Query, Templates, TraceRenderedCTX), {reply, Result, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -102,10 +102,13 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) -> +do_query(Table, Query0, Templates, TraceRenderedCTX) -> try Query = apply_template(Query0, Templates), - TraceRenderedFun(#{table => Table, query => Query}, TraceRenderedCTX), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + table => Table, + query => {fun trace_format_query/1, Query} + }), execute(Query, Table) catch error:{unrecoverable_error, Reason} -> @@ -114,6 +117,14 @@ do_query(Table, Query0, Templates, {TraceRenderedFun, TraceRenderedCTX}) -> {error, {unrecoverable_error, {invalid_request, Reason}}} end. +trace_format_query({Type, Data}) -> + #{type => Type, data => Data}; +trace_format_query([_ | _] = Batch) -> + BatchData = [trace_format_query(Q) || Q <- Batch], + #{type => batch, data => BatchData}; +trace_format_query(Query) -> + Query. + %% some simple query commands for authn/authz or test execute({insert_item, Msg}, Table) -> Item = convert_to_item(Msg), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index d3ffd6c92..f133bf334 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -301,29 +301,11 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> }). with_egress_client(ActionID, ResourceId, Fun, Args) -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ActionID}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID), ecpool:pick_and_do( - ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedFunc | Args]}, no_handover + ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover ). -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - on_async_result(Callback, Result) -> apply_callback_function(Callback, handle_send_result(Result)). @@ -343,9 +325,7 @@ handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> handle_send_result({ok, Reply}) -> {error, classify_reply(Reply)}; handle_send_result({error, Reason}) -> - {error, classify_error(Reason)}; -handle_send_result({unrecoverable_error, Reason}) -> - {error, {unrecoverable_error, Reason}}. + {error, classify_error(Reason)}. classify_reply(Reply = #{reason_code := _}) -> {unrecoverable_error, Reply}. @@ -360,6 +340,8 @@ classify_error({shutdown, _} = Reason) -> {recoverable_error, Reason}; classify_error(shutdown = Reason) -> {recoverable_error, Reason}; +classify_error({unrecoverable_error, _Reason} = Error) -> + Error; classify_error(Reason) -> {unrecoverable_error, Reason}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 80d38bc78..a4a0b0d37 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -29,9 +29,6 @@ -type message() :: emqx_types:message() | map(). -type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. -type remote_message() :: #mqtt_msg{}. --type trace_rendered_func() :: { - fun((RenderResult :: any(), CTX :: map()) -> any()), TraceCTX :: map() -}. -type egress() :: #{ local => #{ @@ -45,37 +42,40 @@ config(#{remote := RC = #{}} = Conf) -> Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. --spec send(pid(), trace_rendered_func(), message(), egress()) -> ok. -send(Pid, TraceRenderedFunc, MsgIn, Egress) -> +-spec send(pid(), emqx_trace:rendered_action_template_ctx(), message(), egress()) -> + ok | {error, {unrecoverable_error, term()}}. +send(Pid, TraceRenderedCTX, MsgIn, Egress) -> try - emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedFunc)) + emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedCTX)) catch error:{unrecoverable_error, Reason} -> - {unrecoverable_error, Reason} + {error, {unrecoverable_error, Reason}} end. --spec send_async(pid(), trace_rendered_func(), message(), callback(), egress()) -> - ok | {ok, pid()}. -send_async(Pid, TraceRenderedFunc, MsgIn, Callback, Egress) -> +-spec send_async(pid(), emqx_trace:rendered_action_template_ctx(), message(), callback(), egress()) -> + {ok, pid()} | {error, {unrecoverable_error, term()}}. +send_async(Pid, TraceRenderedCTX, MsgIn, Callback, Egress) -> try ok = emqtt:publish_async( - Pid, export_msg(MsgIn, Egress, TraceRenderedFunc), _Timeout = infinity, Callback + Pid, export_msg(MsgIn, Egress, TraceRenderedCTX), _Timeout = infinity, Callback ), {ok, Pid} catch error:{unrecoverable_error, Reason} -> - {unrecoverable_error, Reason} + {error, {unrecoverable_error, Reason}} end. -export_msg(Msg, #{remote := Remote}, TraceRenderedFunc) -> - to_remote_msg(Msg, Remote, TraceRenderedFunc). +export_msg(Msg, #{remote := Remote}, TraceRenderedCTX) -> + to_remote_msg(Msg, Remote, TraceRenderedCTX). --spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars(), trace_rendered_func()) -> +-spec to_remote_msg( + message(), emqx_bridge_mqtt_msg:msgvars(), emqx_trace:rendered_action_template_ctx() +) -> remote_message(). -to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedFunc) -> +to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedCTX) -> {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedFunc); -to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedCTX); +to_remote_msg(Msg = #{}, Remote, TraceRenderedCTX) -> #{ topic := Topic, payload := Payload, @@ -83,16 +83,13 @@ to_remote_msg(Msg = #{}, Remote, {TraceRenderedFun, TraceRenderedCTX}) -> retain := Retain } = emqx_bridge_mqtt_msg:render(Msg, Remote), PubProps = maps:get(pub_props, Msg, #{}), - TraceRenderedFun( - #{ - qos => QoS, - retain => Retain, - topic => Topic, - props => PubProps, - payload => Payload - }, - TraceRenderedCTX - ), + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + qos => QoS, + retain => Retain, + topic => Topic, + props => PubProps, + payload => Payload + }), #mqtt_msg{ qos = QoS, retain = Retain, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 1637743b5..dacb47a57 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -9,6 +9,7 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -214,12 +215,10 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedFunc]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -237,12 +236,10 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), Res = ecpool:pick_and_do( ResourceID, - {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedFunc]}, + {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedCTX]}, no_handover ), handle_result(Res); @@ -250,22 +247,6 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) -> {error, {unrecoverable_error, {invalid_message_tag, ChannelId}}} end. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - publish_messages( Conn, RabbitMQ, @@ -278,7 +259,7 @@ publish_messages( publish_confirmation_timeout := PublishConfirmationTimeout }, Messages, - TraceRenderedFunc + TraceRenderedCTX ) -> try publish_messages( @@ -291,10 +272,10 @@ publish_messages( Messages, WaitForPublishConfirmations, PublishConfirmationTimeout, - TraceRenderedFunc + TraceRenderedCTX ) catch - error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> {error, Reason}; %% if send a message to a non-existent exchange, RabbitMQ client will crash %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} @@ -314,7 +295,7 @@ publish_messages( Messages, WaitForPublishConfirmations, PublishConfirmationTimeout, - {TraceRenderedFun, TraceRenderedFuncCTX} + TraceRenderedCTX ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> @@ -330,20 +311,17 @@ publish_messages( format_data(PayloadTmpl, M) || {_, M} <- Messages ], - TraceRenderedFun( - #{ - messages => FormattedMsgs, - properties => #{ - headers => [], - delivery_mode => DeliveryMode - }, - method => #{ - exchange => Exchange, - routing_key => RoutingKey - } + emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{ + messages => FormattedMsgs, + properties => #{ + headers => [], + delivery_mode => DeliveryMode }, - TraceRenderedFuncCTX - ), + method => #{ + exchange => Exchange, + routing_key => RoutingKey + } + }), lists:foreach( fun(Msg) -> amqp_channel:cast( diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 383ceabf7..67b0e77bc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -10,6 +10,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -200,12 +201,10 @@ on_batch_query( ) -> case maps:find(ChannelId, Channels) of {ok, #{batch := Tokens, opts := Opts}} -> - LogMetaData = logger:get_process_metadata(), - TraceRenderedFuncContext = #{trace_ctx => LogMetaData, action_id => ChannelId}, - TraceRenderedFunc = {fun trace_render_result/2, TraceRenderedFuncContext}, + TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId), do_query_job( InstanceId, - {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedFunc]}, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]}, State ); _ -> @@ -216,22 +215,6 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. -trace_render_result(RenderResult, #{trace_ctx := LogMetaData, action_id := ActionID}) -> - OldMetaData = - case logger:get_process_metadata() of - undefined -> #{}; - M -> M - end, - try - logger:set_process_metadata(LogMetaData), - emqx_trace:rendered_action_template( - ActionID, - RenderResult - ) - after - logger:set_process_metadata(OldMetaData) - end. - on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( @@ -358,13 +341,16 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). -do_batch_insert(Conn, Tokens, BatchReqs, Opts, {TraceRenderedFun, TraceRenderedFunCTX}) -> +do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) -> SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), try - TraceRenderedFun(#{query => SQL}, TraceRenderedFunCTX), + emqx_trace:rendered_action_template_with_ctx( + TraceRenderedCTX, + #{query => SQL} + ), execute(Conn, SQL, Opts) catch - error:{unrecoverable_error, {action_stopped_after_template_rendering, _}} = Reason -> + error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason -> {error, Reason} end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index e2f01321a..5ec4bdc6e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -18,6 +18,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). -export([ @@ -724,7 +725,7 @@ inc_action_metrics(TraceCtx, Result) -> do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason} + {error, ?EMQX_TRACE_STOP_ACTION(Explanation) = _Reason} ) -> TraceContext1 = maps:remove(action_id, TraceContext), trace_action( From ff09f1419142e5dad089bcb8301710be8d836da6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 25 Apr 2024 13:27:14 +0200 Subject: [PATCH 27/27] fix(http connector): remove sensitive info from headers lazily In production code we don't need to redact the headers for a trace that will never appear anywhere so we can improve performance by doing removal of sensitive information lazily. --- apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 76f2686a1..88e4d9c36 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -512,7 +512,7 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> host => Host, path => Path, method => Method, - headers => emqx_utils_redact:redact_headers(Headers), + headers => {fun emqx_utils_redact:redact_headers/1, Headers}, timeout => Timeout } ); @@ -523,7 +523,7 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) -> host => Host, path => Path, method => Method, - headers => emqx_utils_redact:redact_headers(Headers), + headers => {fun emqx_utils_redact:redact_headers/1, Headers}, timeout => Timeout, body => {fun log_format_body/1, Body} }