From ef705c228503ba570e0276f62e45af283203ec19 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 2 Apr 2024 18:00:27 +0200 Subject: [PATCH] feat: add apply rule API, clientid/ruleid tracing for rule and connector This commit adds: * Support for forwarding the rule id and client id to the connector so that events such as template rendered successfully can be traced. * HTTP API for for applying/activating a rule with the given context --- .../src/emqx_trace/emqx_trace_handler.erl | 14 +- .../src/emqx_bridge_http_connector.erl | 13 +- .../test/emqx_bridge_http_SUITE.erl | 248 +++++----- .../test/emqx_bridge_http_test_lib.erl | 161 +++++++ .../src/emqx_resource_buffer_worker.erl | 185 +++++-- .../src/emqx_rule_api_schema.erl | 45 +- .../src/emqx_rule_engine_api.erl | 38 ++ .../src/emqx_rule_runtime.erl | 248 +++++++--- .../src/emqx_rule_sqltester.erl | 70 ++- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 451 ++++++++++++++++++ 10 files changed, 1232 insertions(+), 241 deletions(-) create mode 100644 apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 3af543013..c69809052 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -135,14 +135,22 @@ running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). -spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. -filter_ruleid(#{meta := Meta = #{ruleid := RuleId}} = Log, {MatchId, _Name}) -> - filter_ret(RuleId =:= MatchId andalso is_trace(Meta), Log); +filter_ruleid(#{meta := Meta = #{rule_id := RuleId}} = Log, {MatchId, _Name}) -> + RuleIDs = maps:get(rule_ids, Meta, #{}), + IsMatch = (RuleId =:= MatchId) orelse maps:get(MatchId, RuleIDs, false), + filter_ret(IsMatch andalso is_trace(Meta), Log); +filter_ruleid(#{meta := Meta = #{rule_ids := RuleIDs}} = Log, {MatchId, _Name}) -> + filter_ret(maps:get(MatchId, RuleIDs, false) andalso is_trace(Meta), Log); filter_ruleid(_Log, _ExpectId) -> stop. -spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) -> - filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log); + ClientIDs = maps:get(client_ids, Meta, #{}), + IsMatch = (ClientId =:= MatchId) orelse maps:get(MatchId, ClientIDs, false), + filter_ret(IsMatch andalso is_trace(Meta), Log); +filter_clientid(#{meta := Meta = #{client_ids := ClientIDs}} = Log, {MatchId, _Name}) -> + filter_ret(maps:get(MatchId, ClientIDs, false) andalso is_trace(Meta), Log); filter_clientid(_Log, _ExpectId) -> stop. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index ae1e727ca..99222aa00 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -661,13 +661,22 @@ process_request_and_action(Request, ActionState, Msg) -> ), BodyTemplate = maps:get(body, ActionState), Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg), - #{ + RenderResult = #{ method => Method, path => Path, body => Body, headers => Headers, request_timeout => maps:get(request_timeout, ActionState) - }. + }, + ?TRACE( + "QUERY_RENDER", + "http_connector_successfully_rendered_request", + #{ + request => Request, + render_result => RenderResult + } + ), + RenderResult. merge_proplist(Proplist1, Proplist2) -> lists:foldl( diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 3da04012d..ab0d5bb55 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -30,8 +30,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). --define(BRIDGE_TYPE, <<"webhook">>). --define(BRIDGE_NAME, atom_to_binary(?MODULE)). +-define(BRIDGE_TYPE, emqx_bridge_http_test_lib:bridge_type()). +-define(BRIDGE_NAME, emqx_bridge_http_test_lib:bridge_name()). all() -> emqx_common_test_helpers:all(?MODULE). @@ -73,21 +73,10 @@ suite() -> init_per_testcase(t_bad_bridge_config, Config) -> Config; -init_per_testcase(t_send_async_connection_timeout, Config) -> - HTTPPath = <<"/path">>, - ServerSSLOpts = false, - {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( - _Port = random, HTTPPath, ServerSSLOpts - ), - ResponseDelayMS = 500, - ok = emqx_bridge_http_connector_test_server:set_handler( - success_http_handler(#{response_delay => ResponseDelayMS}) - ), - [ - {http_server, #{port => HTTPPort, path => HTTPPath}}, - {response_delay_ms, ResponseDelayMS} - | Config - ]; +init_per_testcase(Case, Config) when + Case =:= t_send_async_connection_timeout orelse Case =:= t_send_get_trace_messages +-> + emqx_bridge_http_test_lib:init_http_success_server(Config); init_per_testcase(t_path_not_found, Config) -> HTTPPath = <<"/nonexisting/path">>, ServerSSLOpts = false, @@ -115,7 +104,9 @@ init_per_testcase(t_bridge_probes_header_atoms, Config) -> {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), - ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()), + ok = emqx_bridge_http_connector_test_server:set_handler( + emqx_bridge_http_test_lib:success_http_handler() + ), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), @@ -126,7 +117,8 @@ end_per_testcase(TestCase, _Config) when TestCase =:= t_too_many_requests; TestCase =:= t_rule_action_expired; TestCase =:= t_bridge_probes_header_atoms; - TestCase =:= t_send_async_connection_timeout + TestCase =:= t_send_async_connection_timeout; + TestCase =:= t_send_get_trace_messages -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), @@ -250,115 +242,8 @@ get_metrics(Name) -> Type = <<"http">>, emqx_bridge:get_metrics(Type, Name). -bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, ?BRIDGE_TYPE), - Name = maps:get(name, Config, ?BRIDGE_NAME), - Host = maps:get(host, Config, "localhost"), - Path = maps:get(path, Config, ""), - PoolSize = maps:get(pool_size, Config, 1), - QueryMode = maps:get(query_mode, Config, "async"), - ConnectTimeout = maps:get(connect_timeout, Config, "1s"), - RequestTimeout = maps:get(request_timeout, Config, "10s"), - ResumeInterval = maps:get(resume_interval, Config, "1s"), - HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"), - ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), - LocalTopic = - case maps:find(local_topic, Config) of - {ok, LT} -> - lists:flatten(["local_topic = \"", LT, "\""]); - error -> - "" - end, - ConfigString = io_lib:format( - "bridges.~s.~s {\n" - " url = \"http://~s:~p~s\"\n" - " connect_timeout = \"~p\"\n" - " enable = true\n" - %% local_topic - " ~s\n" - " enable_pipelining = 100\n" - " max_retries = 2\n" - " method = \"post\"\n" - " pool_size = ~p\n" - " pool_type = \"random\"\n" - " request_timeout = \"~s\"\n" - " body = \"${id}\"\n" - " resource_opts {\n" - " inflight_window = 100\n" - " health_check_interval = \"~s\"\n" - " max_buffer_bytes = \"1GB\"\n" - " query_mode = \"~s\"\n" - " request_ttl = \"~p\"\n" - " resume_interval = \"~s\"\n" - " start_after_created = \"true\"\n" - " start_timeout = \"5s\"\n" - " worker_pool_size = \"1\"\n" - " }\n" - " ssl {\n" - " enable = false\n" - " }\n" - "}\n", - [ - Type, - Name, - Host, - Port, - Path, - ConnectTimeout, - LocalTopic, - PoolSize, - RequestTimeout, - HealthCheckInterval, - QueryMode, - ResourceRequestTTL, - ResumeInterval - ] - ), - ct:pal(ConfigString), - parse_and_check(ConfigString, Type, Name). - -parse_and_check(ConfigString, BridgeType, Name) -> - {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, - RetConfig. - make_bridge(Config) -> - Type = ?BRIDGE_TYPE, - Name = ?BRIDGE_NAME, - BridgeConfig = bridge_async_config(Config#{ - name => Name, - type => Type - }), - {ok, _} = emqx_bridge:create( - Type, - Name, - BridgeConfig - ), - emqx_bridge_resource:bridge_id(Type, Name). - -success_http_handler() -> - success_http_handler(#{response_delay => 0}). - -success_http_handler(Opts) -> - ResponseDelay = maps:get(response_delay, Opts, 0), - TestPid = self(), - fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - Headers = cowboy_req:headers(Req), - ct:pal("http request received: ~p", [ - #{body => Body, headers => Headers, response_delay => ResponseDelay} - ]), - ResponseDelay > 0 andalso timer:sleep(ResponseDelay), - TestPid ! {http, Headers, Body}, - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"text/plain">>}, - <<"hello">>, - Req - ), - {ok, Rep, State} - end. + emqx_bridge_http_test_lib:make_bridge(Config). not_found_http_handler() -> TestPid = self(), @@ -452,6 +337,103 @@ t_send_async_connection_timeout(Config) -> receive_request_notifications(MessageIDs, ResponseDelayMS, []), ok. +t_send_get_trace_messages(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port, path := Path} = ?config(http_server, Config), + BridgeID = make_bridge(#{ + port => Port, + path => Path, + pool_size => 1, + query_mode => "async", + connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms", + request_timeout => "10s", + resume_interval => "200ms", + health_check_interval => "200ms", + resource_request_ttl => "infinity" + }), + RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), + SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_testlib:create_rule_and_action_http( + ?BRIDGE_TYPE, + RuleTopic, + Config, + #{sql => SQL} + ), + %% =================================== + %% Create trace for RuleId + %% =================================== + Now = erlang:system_time(second) - 10, + Start = Now, + End = Now + 60, + TraceName = atom_to_binary(?FUNCTION_NAME), + Trace = #{ + name => TraceName, + type => ruleid, + ruleid => RuleId, + start_at => Start, + end_at => End + }, + emqx_trace_SUITE:reload(), + ok = emqx_trace:clear(), + {ok, _} = emqx_trace:create(Trace), + %% =================================== + + ResourceId = emqx_bridge_resource:resource_id(BridgeID), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertEqual(<<>>, read_rule_trace_file(TraceName, Now)) + ), + Msg = emqx_message:make(RuleTopic, <<"{\"id\": 1}">>), + emqx:publish(Msg), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + 'matched' := 1, + 'actions.failed' := 0, + 'actions.failed.unknown' := 0, + 'actions.success' := 1, + 'actions.total' := 1 + } + }, + emqx_metrics_worker:get_metrics(rule_metrics, RuleId) + ) + ), + + ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, Now)), + + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, Now), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) + end + ), + emqx_trace:delete(TraceName), + ok. + +read_rule_trace_file(TraceName, From) -> + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)), + Bin. + t_async_free_retries(Config) -> #{port := Port} = ?config(http_server, Config), _BridgeID = make_bridge(#{ @@ -518,7 +500,7 @@ t_async_common_retries(Config) -> ok. t_bad_bridge_config(_Config) -> - BridgeConfig = bridge_async_config(#{port => 12345}), + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{port => 12345}), ?assertMatch( {ok, {{_, 201, _}, _Headers, #{ @@ -540,7 +522,7 @@ t_bad_bridge_config(_Config) -> t_start_stop(Config) -> #{port := Port} = ?config(http_server, Config), - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port @@ -554,7 +536,7 @@ t_path_not_found(Config) -> begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, @@ -593,7 +575,7 @@ t_too_many_requests(Config) -> begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, @@ -633,7 +615,7 @@ t_rule_action_expired(Config) -> ?check_trace( begin RuleTopic = <<"t/webhook/rule">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, host => "non.existent.host", @@ -689,7 +671,7 @@ t_bridge_probes_header_atoms(Config) -> ?check_trace( begin LocalTopic = <<"t/local/topic">>, - BridgeConfig0 = bridge_async_config(#{ + BridgeConfig0 = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port, diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl new file mode 100644 index 000000000..4959a24c3 --- /dev/null +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_http_test_lib). + +-export([ + bridge_type/0, + bridge_name/0, + make_bridge/1, + bridge_async_config/1, + init_http_success_server/1, + success_http_handler/0 +]). + +-define(BRIDGE_TYPE, bridge_type()). +-define(BRIDGE_NAME, bridge_name()). + +bridge_type() -> + <<"webhook">>. + +bridge_name() -> + atom_to_binary(?MODULE). + +make_bridge(Config) -> + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), + Host = maps:get(host, Config, "localhost"), + Path = maps:get(path, Config, ""), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, "1s"), + RequestTimeout = maps:get(request_timeout, Config, "10s"), + ResumeInterval = maps:get(resume_interval, Config, "1s"), + HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"), + ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), + LocalTopic = + case maps:find(local_topic, Config) of + {ok, LT} -> + lists:flatten(["local_topic = \"", LT, "\""]); + error -> + "" + end, + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://~s:~p~s\"\n" + " connect_timeout = \"~p\"\n" + " enable = true\n" + %% local_topic + " ~s\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~s\"\n" + " body = \"${id}\"\n" + " resource_opts {\n" + " inflight_window = 100\n" + " health_check_interval = \"~s\"\n" + " max_buffer_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_ttl = \"~p\"\n" + " resume_interval = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Host, + Port, + Path, + ConnectTimeout, + LocalTopic, + PoolSize, + RequestTimeout, + HealthCheckInterval, + QueryMode, + ResourceRequestTTL, + ResumeInterval + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +success_http_handler() -> + success_http_handler(#{response_delay => 0}). + +success_http_handler(Opts) -> + ResponseDelay = maps:get(response_delay, Opts, 0), + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + Headers = cowboy_req:headers(Req), + ct:pal("http request received: ~p", [ + #{body => Body, headers => Headers, response_delay => ResponseDelay} + ]), + ResponseDelay > 0 andalso timer:sleep(ResponseDelay), + TestPid ! {http, Headers, Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"hello">>, + Req + ), + {ok, Rep, State} + end. + +init_http_success_server(Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ResponseDelayMS = 500, + ok = emqx_bridge_http_connector_test_server:set_handler( + success_http_handler(#{response_delay => ResponseDelayMS}) + ), + [ + {http_server, #{port => HTTPPort, path => HTTPPath}}, + {response_delay_ms, ResponseDelayMS}, + {bridge_name, ?BRIDGE_NAME} + | Config + ]. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 37f9369ff..6dfcde88c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -64,8 +64,10 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). --define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). --define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX), + {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} +). +-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} @@ -77,7 +79,10 @@ -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). --type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). +-type trace_context() :: map() | undefined. +-type queue_query() :: ?QUERY( + reply_fun(), request(), HasBeenSent :: boolean(), expire_at(), TraceCtx :: trace_context() +). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). -type timeout_ms() :: emqx_schema:timeout_duration_ms(). @@ -154,7 +159,10 @@ simple_sync_query(Id, Request, QueryOpts0) -> emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), - Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts), + TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined), + Result = call_query( + force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts + ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -167,8 +175,9 @@ simple_async_query(Id, Request, QueryOpts0) -> emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined), Result = call_query( - async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts + async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -439,10 +448,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), {ShouldAck, PostFn, DeltaCounters} = case QueryOrBatch of - ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> + ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) -> Reply = ?REPLY(ReplyTo, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); - [?QUERY(_, _, _, _) | _] = Batch -> + [?QUERY(_, _, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, Data1 = aggregate_counters(Data0, DeltaCounters), @@ -501,11 +510,13 @@ collect_and_enqueue_query_requests(Request0, Data0) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), - ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); + TraceCtx = maps:get(trace_ctx, Opts, undefined), + ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt, TraceCtx); (?SEND_REQ(ReplyTo, {query, Req, Opts})) -> HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), - ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt) + TraceCtx = maps:get(trace_ctx, Opts, undefined), + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx) end, Requests ), @@ -515,7 +526,7 @@ collect_and_enqueue_query_requests(Request0, Data0) -> reply_overflown([]) -> ok; -reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) -> +reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) | More]) -> do_reply_caller(ReplyTo, {error, buffer_overflow}), reply_overflown(More). @@ -630,7 +641,7 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, + [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), @@ -824,14 +835,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) -> lists:map( - fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) -> + fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) -> ?REPLY(FROM, SENT, Result) end, lists:zip(Batch, BatchResults) ); expand_batch_reply(BatchResult, Batch) -> lists:map( - fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) -> ?REPLY(FROM, SENT, BatchResult) end, Batch @@ -880,7 +891,7 @@ reply_dropped(_ReplyTo, _Result) -> -spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok. batch_reply_dropped(Batch, Result) -> lists:foreach( - fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) -> + fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt, _TraceCtx)) -> reply_dropped(ReplyTo, Result) end, Batch @@ -1093,11 +1104,80 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> - do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); + set_rule_id_trace_meta_data(Query), + QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource), + %% do_call_query does not throw an exception as the call to the + %% resource is wrapped in a try catch expression so we will always + %% unset the trace meta data + unset_rule_id_trace_meta_data(), + QueryResult; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. +set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> + %% Get the rule ids from requests + RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), + ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), + StopAfterRender = lists:foldl(fun collect_stop_after_render/2, no_info, Requests), + StopAfterRenderVal = + case StopAfterRender of + only_true -> + logger:update_process_metadata(#{stop_action_after_render => false}), + true; + only_false -> + false; + mixed -> + ?TRACE( + warning, + "ACTION", + "mixed_stop_action_after_render_batch " + "(A batch will be sent to connector where some but " + "not all requests has stop_action_after_render set. " + "The batch will get assigned " + "stop_action_after_render = false)", + #{rule_ids => RuleIDs, client_ids => ClientIDs} + ), + false + end, + logger:update_process_metadata(#{ + rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal + }), + ok; +set_rule_id_trace_meta_data(Request) -> + set_rule_id_trace_meta_data([Request]), + ok. + +collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) -> + Acc#{RuleId => true}; +collect_rule_id(?QUERY(_, _, _, _, _), Acc) -> + Acc. + +collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> + Acc#{ClientId => true}; +collect_client_id(?QUERY(_, _, _, _, _), Acc) -> + Acc. + +collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), no_info) -> + only_true; +collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_true) -> + only_true; +collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_false) -> + mixed; +collect_stop_after_render(?QUERY(_, _, _, _, _), no_info) -> + only_false; +collect_stop_after_render(?QUERY(_, _, _, _, _), only_true) -> + mixed; +collect_stop_after_render(?QUERY(_, _, _, _, _), only_false) -> + only_false; +collect_stop_after_render(?QUERY(_, _, _, _, _), mixed) -> + mixed. + +unset_rule_id_trace_meta_data() -> + logger:update_process_metadata(#{ + rule_ids => #{}, client_ids => #{}, stop_action_after_render => false + }). + %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> case binary:split(Id, <<":">>, [global]) of @@ -1208,7 +1288,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> ). apply_query_fun( - sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts + sync, + Mod, + Id, + _Index, + _Ref, + ?QUERY(_, Request, _, _, _TraceCtx) = _Query, + ResSt, + Channels, + QueryOpts ) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), maybe_reply_to( @@ -1227,7 +1315,15 @@ apply_query_fun( QueryOpts ); apply_query_fun( - async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts + async, + Mod, + Id, + Index, + Ref, + ?QUERY(_, Request, _, _, _TraceCtx) = Query, + ResSt, + Channels, + QueryOpts ) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1268,7 +1364,7 @@ apply_query_fun( Id, _Index, _Ref, - [?QUERY(_, FirstRequest, _, _) | _] = Batch, + [?QUERY(_, FirstRequest, _, _, _) | _] = Batch, ResSt, Channels, QueryOpts @@ -1276,7 +1372,9 @@ apply_query_fun( ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), + Requests = lists:map( + fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch + ), maybe_reply_to( ?APPLY_RESOURCE( call_batch_query, @@ -1298,7 +1396,7 @@ apply_query_fun( Id, Index, Ref, - [?QUERY(_, FirstRequest, _, _) | _] = Batch, + [?QUERY(_, FirstRequest, _, _, _) | _] = Batch, ResSt, Channels, QueryOpts @@ -1321,7 +1419,7 @@ apply_query_fun( min_batch => minimize(Batch) }, Requests = lists:map( - fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch + fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch ), IsRetriable = false, AsyncWorkerMRef = undefined, @@ -1367,7 +1465,7 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, buffer_worker := BufferWorkerPid, - min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, _, ExpireAt, _TraceCtx) = _Query } = ReplyContext, Result ) -> @@ -1399,7 +1497,7 @@ do_handle_async_reply( request_ref := Ref, buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, - min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query }, Result ) -> @@ -1486,13 +1584,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) -> %% So we just take the original flag from the ReplyContext batch %% and put it back to the batch found in inflight table %% which must have already been set to `false` - [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) | _] = Batch, {RealNotExpired0, RealExpired, Results} = sieve_expired_requests_with_results(RealBatch, Now, Results0), RealNotExpired = lists:map( - fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> - ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt, TraceCtx)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt, TraceCtx) end, RealNotExpired0 ), @@ -1678,7 +1776,10 @@ inflight_get_first_retriable(InflightTID, Now) -> case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; - {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> + { + [{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)}], + _Continuation + } -> case is_expired(ExpireAt, Now) of true -> {expired, Ref, [Query]}; @@ -1714,7 +1815,7 @@ inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) ) -> Batch = mark_as_sent(Batch0), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), @@ -1726,7 +1827,10 @@ inflight_append( inflight_append( InflightTID, ?INFLIGHT_ITEM( - Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef + Ref, + ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) = Query0, + IsRetriable, + AsyncWorkerMRef ) ) -> Query = mark_as_sent(Query0), @@ -1790,9 +1894,13 @@ ack_inflight(undefined, _Ref, _BufferWorkerPid) -> ack_inflight(InflightTID, Ref, BufferWorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> {1, true}; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] -> + [ + ?INFLIGHT_ITEM( + Ref, [?QUERY(_, _, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef + ) + ] -> {length(Batch), true}; [] -> {0, false} @@ -1942,9 +2050,9 @@ do_collect_requests(Acc, Count, Limit) -> mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); -mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) -> +mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt, TraceCtx)) -> HasBeenSent = true, - ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt). + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; @@ -1967,7 +2075,7 @@ is_async_return(_) -> sieve_expired_requests(Batch, Now) -> lists:partition( - fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)) -> not is_expired(ExpireAt, Now) end, Batch @@ -1978,7 +2086,7 @@ sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) - {RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} = lists:foldl( fun( - {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result}, + {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx) = Query, Result}, {NotExpAcc, ResAcc, ExpAcc} ) -> case not is_expired(ExpireAt, Now) of @@ -2026,15 +2134,16 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> Opts#{expire_at => ExpireAt}. %% no need to keep the request for async reply handler -minimize(?QUERY(_, _, _, _) = Q) -> +minimize(?QUERY(_, _, _, _, _) = Q) -> do_minimize(Q); minimize(L) when is_list(L) -> lists:map(fun do_minimize/1, L). -ifdef(TEST). -do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. +do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt, _TraceCtx) = Query) -> Query. -else. -do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). +do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt, TraceCtx)) -> + ?QUERY(ReplyTo, [], Sent, ExpireAt, TraceCtx). -endif. %% To avoid message loss due to misconfigurations, we adjust diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index d82951124..a24ef5bd0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -54,7 +54,8 @@ roots() -> {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})}, {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})}, {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})}, - {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})} + {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}, + {"rule_apply_test", sc(ref("rule_apply_test"), #{desc => ?DESC("root_apply_rule_test")})} ]. fields("rule_engine") -> @@ -124,6 +125,48 @@ fields("rule_test") -> )}, {"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})} ]; +fields("rule_apply_test") -> + [ + {"context", + sc( + hoconsc:union([ + ref("ctx_pub"), + ref("ctx_sub"), + ref("ctx_unsub"), + ref("ctx_delivered"), + ref("ctx_acked"), + ref("ctx_dropped"), + ref("ctx_connected"), + ref("ctx_disconnected"), + ref("ctx_connack"), + ref("ctx_check_authz_complete"), + ref("ctx_bridge_mqtt"), + ref("ctx_delivery_dropped") + ]), + #{ + desc => ?DESC("test_context"), + default => #{} + } + )}, + {"environment", + sc( + typerefl:map(), + #{ + desc => + ?DESC("test_rule_environment"), + default => #{} + } + )}, + {"stop_action_after_template_render", + sc( + typerefl:boolean(), + #{ + desc => + ?DESC("stop_action_after_template_render"), + default => false + } + )} + ]; fields("metrics") -> [ {"matched", diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 354e40c5f..c0514b82b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -37,6 +37,7 @@ '/rule_test'/2, '/rules'/2, '/rules/:id'/2, + '/rules/:id/test'/2, '/rules/:id/metrics'/2, '/rules/:id/metrics/reset'/2 ]). @@ -145,6 +146,7 @@ paths() -> "/rule_test", "/rules", "/rules/:id", + "/rules/:id/test", "/rules/:id/metrics", "/rules/:id/metrics/reset" ]. @@ -161,6 +163,9 @@ rule_creation_schema() -> rule_test_schema() -> ref(emqx_rule_api_schema, "rule_test"). +rule_apply_test_schema() -> + ref(emqx_rule_api_schema, "rule_apply_test"). + rule_info_schema() -> ref(emqx_rule_api_schema, "rule_info"). @@ -258,6 +263,21 @@ schema("/rules/:id") -> } } }; +schema("/rules/:id/test") -> + #{ + 'operationId' => '/rules/:id/test', + post => #{ + tags => [<<"rules">>], + description => ?DESC("api8"), + summary => <<"Apply a rule with the given message and environment">>, + 'requestBody' => rule_apply_test_schema(), + responses => #{ + 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), + 412 => error_schema('NOT_MATCH', "SQL Not Match"), + 200 => <<"Rule Applied">> + } + } + }; schema("/rules/:id/metrics") -> #{ 'operationId' => '/rules/:id/metrics', @@ -392,6 +412,24 @@ param_path_id() -> end ). +'/rules/:id/test'(post, #{body := Params, bindings := #{id := RuleId}}) -> + ?CHECK_PARAMS( + Params, + rule_apply_test, + begin + case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of + {ok, Result} -> + {200, Result}; + {error, {parse_error, Reason}} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {error, nomatch} -> + {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end + end + ). + '/rules/:id'(get, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 9a307e2c3..1204ea5e5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -69,9 +69,14 @@ apply_rule_discard_result(Rule, Columns, Envs) -> ok. apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> - ?TRACE("APPLY_RULE", "rule_activated", #{ - ruleid => RuleID, input => Columns, environment => Envs - }), + set_process_trace_metadata(RuleID, Columns), + trace_rule_sql( + "rule_activated", + #{ + input => Columns, environment => Envs + }, + debug + ), ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'), clear_rule_payload(), try @@ -80,48 +85,80 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> %% ignore the errors if select or match failed _:Reason = {select_and_transform_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "SELECT_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "SELECT_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {match_conditions_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "WHERE_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "WHERE_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {select_and_collect_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "FOREACH_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "FOREACH_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {match_incase_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "INCASE_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "INCASE_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; Class:Error:StkTrace -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(error, #{ - msg => "apply_rule_failed", - rule_id => RuleID, - exception => Class, - reason => Error, - stacktrace => StkTrace - }), + trace_rule_sql( + "apply_rule_failed", + #{ + exception => Class, + reason => Error, + stacktrace => StkTrace + }, + warning + ), {error, {Error, StkTrace}} + after + reset_process_trace_metadata(Columns) end. +set_process_trace_metadata(RuleID, #{clientid := ClientID}) -> + logger:update_process_metadata(#{ + rule_id => RuleID, + clientid => ClientID + }); +set_process_trace_metadata(RuleID, _) -> + logger:update_process_metadata(#{ + rule_id => RuleID + }). + +reset_process_trace_metadata(#{clientid := _ClientID}) -> + Meta = logger:get_process_metadata(), + Meta1 = maps:remove(clientid, Meta), + Meta2 = maps:remove(rule_id, Meta1), + logger:set_process_metadata(Meta2); +reset_process_trace_metadata(_) -> + Meta = logger:get_process_metadata(), + Meta1 = maps:remove(rule_id, Meta), + logger:set_process_metadata(Meta1). + do_apply_rule( #{ id := RuleId, @@ -139,13 +176,18 @@ do_apply_rule( {ok, ColumnsAndSelected, FinalCollection} -> case FinalCollection of [] -> + trace_rule_sql("FOREACH_yielded_no_result"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> + trace_rule_sql( + "FOREACH_yielded_result", #{result => FinalCollection}, debug + ), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, NewEnvs = maps:merge(ColumnsAndSelected, Envs), {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]}; false -> + trace_rule_sql("FOREACH_yielded_no_result_no_match"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end; @@ -162,9 +204,11 @@ do_apply_rule( ) -> case evaluate_select(Fields, Columns, Conditions) of {ok, Selected} -> + trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; false -> + trace_rule_sql("SELECT_yielded_no_result_no_match"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end. @@ -348,37 +392,42 @@ handle_action_list(RuleId, Actions, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), + trace_action(ActId, "activating_action"), try - do_handle_action(RuleId, ActId, Selected, Envs) + Result = do_handle_action(RuleId, ActId, Selected, Envs), + trace_action(ActId, "action_activated", #{result => Result}), + Result catch throw:out_of_service -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc( rule_metrics, RuleId, 'actions.failed.out_of_service' ), - ?SLOG(warning, #{msg => "out_of_service", action => ActId}); + trace_action(ActId, "out_of_service", #{}, warning); Err:Reason:ST -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'), - ?SLOG(error, #{ - msg => "action_failed", - action => ActId, - exception => Err, - reason => Reason, - stacktrace => ST - }) + trace_action( + ActId, + "action_failed", + #{ + exception => Err, + reason => Reason, + stacktrace => ST + }, + error + ) end. -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target). -do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> - ?TRACE( - "BRIDGE", - "bridge_action", - #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} - ), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}}, +do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) -> + trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), + TraceCtx = do_handle_action_get_trace_context(Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}}, case - emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo}) + emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{ + reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx) + }) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); @@ -388,23 +437,20 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env Result end; do_handle_action( - RuleId, - {bridge_v2, BridgeType, BridgeName}, + _RuleId, + {bridge_v2, BridgeType, BridgeName} = Action, Selected, _Envs ) -> - ?TRACE( - "BRIDGE", - "bridge_action", - #{bridge_id => {bridge_v2, BridgeType, BridgeName}} - ), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}}, + trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), + TraceCtx = do_handle_action_get_trace_context(Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}}, case emqx_bridge_v2:send_message( BridgeType, BridgeName, Selected, - #{reply_to => ReplyTo} + #{reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)} ) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> @@ -414,13 +460,43 @@ do_handle_action( Result -> Result end; -do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -> +do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -> + trace_action(Action, "call_action_function"), %% the function can also throw 'out_of_service' Args = maps:get(args, Action, []), Result = Mod:Func(Selected, Envs, Args), - inc_action_metrics(RuleId, Result), + TraceCtx = do_handle_action_get_trace_context(Action), + inc_action_metrics(TraceCtx, Result), + trace_action(Action, "call_action_function_result", #{result => Result}, debug), Result. +do_handle_action_get_trace_context(Action) -> + case logger:get_process_metadata() of + #{ + rule_id := RuleID, + clientid := ClientID + } -> + #{ + rule_id => RuleID, + clientid => ClientID, + action_id => Action + }; + #{ + rule_id := RuleID + } -> + #{ + rule_id => RuleID, + action_id => Action + } + end. + +action_info({bridge, BridgeType, BridgeName, _ResId}) -> + #{type => BridgeType, name => BridgeName}; +action_info({bridge_v2, BridgeType, BridgeName}) -> + #{type => BridgeType, name => BridgeName}; +action_info(FuncInfoMap) -> + FuncInfoMap. + eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) -> case Context of [Columns] -> @@ -599,21 +675,31 @@ nested_put(Alias, Val, Columns0) -> Columns = ensure_decoded_payload(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). -inc_action_metrics(RuleId, Result) -> - _ = do_inc_action_metrics(RuleId, Result), +inc_action_metrics(TraceCtx, Result) -> + _ = do_inc_action_metrics(TraceCtx, Result), Result. -do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) -> +do_inc_action_metrics( + #{rule_id := RuleId, action_id := ActId} = TraceContext, + {error, {recoverable_error, _}} +) -> + trace_action(ActId, "out_of_service", TraceContext), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) -> +do_inc_action_metrics( + #{rule_id := RuleId, action_id := ActId} = TraceContext, + {error, {unrecoverable_error, _} = Reason} +) -> + trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); -do_inc_action_metrics(RuleId, R) -> +do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) -> case is_ok_result(R) of false -> + trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); true -> + trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') end. @@ -661,3 +747,39 @@ parse_function_name(Module, Name) when is_binary(Name) -> end; parse_function_name(_Module, Name) when is_atom(Name) -> Name. + +trace_action(ActId, Message) -> + trace_action_bridge("ACTION", ActId, Message). + +trace_action(ActId, Message, Extra) -> + trace_action_bridge("ACTION", ActId, Message, Extra, debug). + +trace_action(ActId, Message, Extra, Level) -> + trace_action_bridge("ACTION", ActId, Message, Extra, Level). + +trace_action_bridge(Tag, ActId, Message) -> + trace_action_bridge(Tag, ActId, Message, #{}, debug). + +trace_action_bridge(Tag, ActId, Message, Extra, Level) -> + ?TRACE( + Level, + Tag, + Message, + maps:merge( + #{ + action_info => action_info(ActId) + }, + Extra + ) + ). + +trace_rule_sql(Message) -> + trace_rule_sql(Message, #{}, debug). + +trace_rule_sql(Message, Extra, Level) -> + ?TRACE( + Level, + "RULE_SQL_EXEC", + Message, + Extra + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 8212e3385..e72b0fcd0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -20,9 +20,77 @@ test/1, get_selected_data/3, %% Some SQL functions return different results in the test environment - is_test_runtime_env/0 + is_test_runtime_env/0, + apply_rule/2 ]). +apply_rule( + RuleId, + #{ + context := Context, + environment := Env, + stop_action_after_template_render := StopAfterRender + } +) -> + {ok, Rule} = emqx_rule_engine:get_rule(RuleId), + InTopic = get_in_topic(Context), + EventTopics = maps:get(from, Rule, []), + case lists:all(fun is_publish_topic/1, EventTopics) of + true -> + %% test if the topic matches the topic filters in the rule + case emqx_topic:match_any(InTopic, EventTopics) of + true -> do_apply_matched_rule(Rule, Context, Env, StopAfterRender); + false -> {error, nomatch} + end; + false -> + case lists:member(InTopic, EventTopics) of + true -> + %% the rule is for both publish and events, test it directly + do_apply_matched_rule(Rule, Context, Env, StopAfterRender); + false -> + {error, nomatch} + end + end. + +do_apply_matched_rule(Rule, Context, Env, StopAfterRender) -> + update_process_trace_metadata(StopAfterRender), + Env1 = + case Env of + M when map_size(M) =:= 0 -> + %% Use the default environment if no environment is provided + default_apply_rule_environment(); + _ -> + Env + end, + ApplyRuleRes = emqx_rule_runtime:apply_rule(Rule, Context, Env1), + reset_trace_process_metadata(StopAfterRender), + ApplyRuleRes. + +update_process_trace_metadata(true = _StopAfterRender) -> + logger:update_process_trace_metadata(#{ + stop_action_after_render => true + }); +update_process_trace_metadata(false = _StopAfterRender) -> + ok. + +reset_trace_process_metadata(true = _StopAfterRender) -> + Meta = logger:get_process_metadata(), + NewMeta = maps:remove(stop_action_after_render, Meta), + logger:set_process_metadata(NewMeta); +reset_trace_process_metadata(false = _StopAfterRender) -> + ok. + +default_apply_rule_environment() -> + #{ + headers => #{ + protocol => mqtt, + username => undefined, + peerhost => {127, 0, 0, 1}, + proto_ver => 5, + properties => #{} + } + }. + -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> case emqx_rule_sqlparser:parse(Sql) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl new file mode 100644 index 000000000..d9d15dba0 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -0,0 +1,451 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_api_rule_apply_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_conf), + % ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT), + % ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx, emqx_rule_engine, emqx_bridge, emqx_bridge_http]), + + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_http, + emqx_bridge, + emqx_rule_engine + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_Case, Config) -> + emqx_bridge_http_test_lib:init_http_success_server(Config). + +end_per_testcase(_TestCase, _Config) -> + ok = emqx_bridge_http_connector_test_server:stop(), + emqx_bridge_v2_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_connectors(), + emqx_common_test_helpers:call_janitor(), + ok. + +t_basic_apply_rule_trace_ruleid(Config) -> + basic_apply_rule_test_helper(Config, ruleid). + +t_basic_apply_rule_trace_clientid(Config) -> + basic_apply_rule_test_helper(Config, clientid). + +basic_apply_rule_test_helper(Config, TraceType) -> + HTTPServerConfig = ?config(http_server, Config), + emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig), + #{status := connected} = emqx_bridge_v2:health_check( + http, emqx_bridge_http_test_lib:bridge_name() + ), + %% Create Rule + RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), + SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_testlib:create_rule_and_action_http( + http, + RuleTopic, + Config, + #{sql => SQL} + ), + ClientId = <<"c_emqx">>, + %% =================================== + %% Create trace for RuleId + %% =================================== + Now = erlang:system_time(second) - 10, + Start = Now, + End = Now + 60, + TraceName = atom_to_binary(?FUNCTION_NAME), + TraceValue = + case TraceType of + ruleid -> + RuleId; + clientid -> + ClientId + end, + Trace = #{ + name => TraceName, + type => TraceType, + TraceType => TraceValue, + start_at => Start, + end_at => End + }, + emqx_trace_SUITE:reload(), + ok = emqx_trace:clear(), + {ok, _} = emqx_trace:create(Trace), + %% =================================== + Context = #{ + clientid => ClientId, + event_type => message_publish, + payload => <<"{\"msg\": \"hello\"}">>, + qos => 1, + topic => RuleTopic, + username => <<"u_emqx">> + }, + Params = #{ + % body => #{ + <<"context">> => Context + % } + }, + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), + {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)), + ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) + end + ), + emqx_trace:delete(TraceName), + ok. + +%% Helper Functions + +% t_ctx_pub(_) -> +% SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>, +% Context = #{ +% clientid => <<"c_emqx">>, +% event_type => message_publish, +% payload => <<"{\"msg\": \"hello\"}">>, +% qos => 1, +% topic => <<"t/a">>, +% username => <<"u_emqx">> +% }, +% Expected = Context#{msg => <<"hello">>}, +% do_test(SQL, Context, Expected). + +% t_ctx_sub(_) -> +% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>, +% Context = #{ +% clientid => <<"c_emqx">>, +% event_type => session_subscribed, +% qos => 1, +% topic => <<"t/a">>, +% username => <<"u_emqx">> +% }, + +% do_test(SQL, Context, Context). + +% t_ctx_unsub(_) -> +% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>, +% Context = #{ +% clientid => <<"c_emqx">>, +% event_type => session_unsubscribed, +% qos => 1, +% topic => <<"t/a">>, +% username => <<"u_emqx">> +% }, +% do_test(SQL, Context, Context). + +% t_ctx_delivered(_) -> +% SQL = +% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>, +% Context = #{ +% clientid => <<"c_emqx_2">>, +% event_type => message_delivered, +% from_clientid => <<"c_emqx_1">>, +% from_username => <<"u_emqx_1">>, +% payload => <<"{\"msg\": \"hello\"}">>, +% qos => 1, +% topic => <<"t/a">>, +% username => <<"u_emqx_2">> +% }, +% Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context), +% do_test(SQL, Context, Expected). + +% t_ctx_acked(_) -> +% SQL = +% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>, + +% Context = #{ +% clientid => <<"c_emqx_2">>, +% event_type => message_acked, +% from_clientid => <<"c_emqx_1">>, +% from_username => <<"u_emqx_1">>, +% payload => <<"{\"msg\": \"hello\"}">>, +% qos => 1, +% topic => <<"t/a">>, +% username => <<"u_emqx_2">> +% }, + +% Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context), + +% do_test(SQL, Context, Expected). + +% t_ctx_droped(_) -> +% SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>, +% Topic = <<"t/a">>, +% QoS = 1, +% Reason = <<"no_subscribers">>, +% Context = #{ +% clientid => <<"c_emqx">>, +% event_type => message_dropped, +% payload => <<"{\"msg\": \"hello\"}">>, +% qos => QoS, +% reason => Reason, +% topic => Topic, +% username => <<"u_emqx">> +% }, + +% Expected = with_node_timestampe([reason, topic, qos], Context), +% do_test(SQL, Context, Expected). + +% t_ctx_connected(_) -> +% SQL = +% <<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>, + +% Context = +% #{ +% clean_start => true, +% clientid => <<"c_emqx">>, +% event_type => client_connected, +% is_bridge => false, +% peername => <<"127.0.0.1:52918">>, +% username => <<"u_emqx">> +% }, +% Expected = check_result([clientid, username, keepalive, is_bridge], [], Context), +% do_test(SQL, Context, Expected). + +% t_ctx_disconnected(_) -> +% SQL = +% <<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>, + +% Context = +% #{ +% clientid => <<"c_emqx">>, +% event_type => client_disconnected, +% reason => <<"normal">>, +% username => <<"u_emqx">> +% }, +% Expected = check_result([clientid, username, reason], [disconnected_at, node], Context), +% do_test(SQL, Context, Expected). + +% t_ctx_connack(_) -> +% SQL = +% <<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>, + +% Context = +% #{ +% clean_start => true, +% clientid => <<"c_emqx">>, +% event_type => client_connack, +% reason_code => <<"sucess">>, +% username => <<"u_emqx">> +% }, +% Expected = check_result([clientid, username, reason_code], [node], Context), +% do_test(SQL, Context, Expected). + +% t_ctx_check_authz_complete(_) -> +% SQL = +% << +% "SELECT clientid, username, topic, action, result,\n" +% "authz_source, node FROM \"$events/client_check_authz_complete\"" +% >>, + +% Context = +% #{ +% action => <<"publish">>, +% clientid => <<"c_emqx">>, +% event_type => client_check_authz_complete, +% result => <<"allow">>, +% topic => <<"t/1">>, +% username => <<"u_emqx">> +% }, +% Expected = check_result( +% [clientid, username, topic, action], +% [authz_source, node, result], +% Context +% ), + +% do_test(SQL, Context, Expected). + +% t_ctx_delivery_dropped(_) -> +% SQL = +% <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>, + +% Context = +% #{ +% clientid => <<"c_emqx_2">>, +% event_type => delivery_dropped, +% from_clientid => <<"c_emqx_1">>, +% from_username => <<"u_emqx_1">>, +% payload => <<"{\"msg\": \"hello\"}">>, +% qos => 1, +% reason => <<"queue_full">>, +% topic => <<"t/a">>, +% username => <<"u_emqx_2">> +% }, +% Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context), +% do_test(SQL, Context, Expected). + +% t_mongo_date_function_should_return_string_in_test_env(_) -> +% SQL = +% <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>, +% Context = +% #{ +% action => <<"publish">>, +% clientid => <<"c_emqx">>, +% event_type => client_check_authz_complete, +% result => <<"allow">>, +% topic => <<"t/1">>, +% username => <<"u_emqx">> +% }, +% CheckFunction = fun(Result) -> +% MongoDate = maps:get(mongo_date, Result), +% %% Use regex to match the expected string +% MatchResult = re:run(MongoDate, <<"ISODate\\([0-9]{4}-[0-9]{2}-[0-9]{2}T.*\\)">>), +% ?assertMatch({match, _}, MatchResult), +% ok +% end, +% do_test(SQL, Context, CheckFunction). + +% do_test(SQL, Context, Expected0) -> +% Res = emqx_rule_engine_api:'/rule_test'( +% post, +% test_rule_params(SQL, Context) +% ), +% ?assertMatch({200, _}, Res), +% {200, Result0} = Res, +% Result = emqx_utils_maps:unsafe_atom_key_map(Result0), +% case is_function(Expected0) of +% false -> +% Expected = maps:without([event_type], Expected0), +% ?assertMatch(Expected, Result, Expected); +% _ -> +% Expected0(Result) +% end, +% ok. + +% test_rule_params(Sql, Context) -> +% #{ +% body => #{ +% <<"context">> => Context, +% <<"sql">> => Sql +% } +% }. + +% with_node_timestampe(Keys, Context) -> +% check_result(Keys, [node, timestamp], Context). + +% check_result(Keys, Exists, Context) -> +% Log = fun(Format, Args) -> +% lists:flatten(io_lib:format(Format, Args)) +% end, + +% Base = maps:with(Keys, Context), + +% fun(Result) -> +% maps:foreach( +% fun(Key, Value) -> +% ?assertEqual( +% Value, +% maps:get(Key, Result, undefined), +% Log("Key:~p value error~nResult:~p~n", [Key, Result]) +% ) +% end, +% Base +% ), + +% NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end, +% lists:foreach( +% fun(Key) -> +% Find = maps:find(Key, Result), +% Formatter = NotExists(Key), +% ?assertMatch({ok, _}, Find, Formatter), +% ?assertNotMatch({ok, undefined}, Find, Formatter), +% ?assertNotMatch({ok, <<"undefined">>}, Find, Formatter) +% end, +% Exists +% ), + +% ?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result) +% end. + +call_apply_rule_api(RuleId, Params) -> + Method = post, + Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]), + ct:pal("sql test (http):\n ~p", [Params]), + Res = request(Method, Path, Params), + ct:pal("sql test (http) result:\n ~p", [Res]), + Res. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +read_rule_trace_file(TraceName, TraceType, From) -> + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)), + io_lib:format("MYTRACE:~n~s", [Bin]), + Bin.