diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 5c50fa706..476794223 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -20,9 +20,14 @@ -record(?TRACE, { name :: binary() | undefined | '_', - type :: clientid | topic | ip_address | undefined | '_', + type :: clientid | topic | ip_address | ruleid | undefined | '_', filter :: - emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_', + emqx_types:topic() + | emqx_types:clientid() + | emqx_trace:ip_address() + | emqx_trace:ruleid() + | undefined + | '_', enable = true :: boolean() | '_', payload_encode = text :: hex | text | hidden | '_', extra = #{} :: map() | '_', diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index f3a5be084..4ae973722 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -28,7 +28,8 @@ subscribe/3, unsubscribe/2, log/3, - log/4 + log/4, + rendered_action_template/2 ]). -export([ @@ -66,6 +67,9 @@ -export_type([ip_address/0]). -type ip_address() :: string(). +-export_type([ruleid/0]). +-type ruleid() :: binary(). + publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -83,6 +87,26 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> unsubscribe(Topic, SubOpts) -> ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). +rendered_action_template(ActionID, RenderResult) -> + Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])), + TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult), + case logger:get_process_metadata() of + #{stop_action_after_render := true} -> + %% We throw an unrecoverable error to stop action before the + %% resource is called/modified + StopMsg = lists:flatten( + io_lib:format( + "Action ~ts stopped after template rendering due to test setting.", + [ActionID] + ) + ), + MsgBin = unicode:characters_to_binary(StopMsg), + error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}}); + _ -> + ok + end, + TraceResult. + log(List, Msg, Meta) -> log(debug, List, Msg, Meta). @@ -517,6 +541,9 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> Error -> Error end; +to_trace(#{type := ruleid, ruleid := Filter} = Trace, Rec) -> + Trace0 = maps:without([type, ruleid], Trace), + to_trace(Trace0, Rec#?TRACE{type = ruleid, filter = Filter}); to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 313826cde..c69809052 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -33,6 +33,7 @@ %% For logger handler filters callbacks -export([ + filter_ruleid/2, filter_clientid/2, filter_topic/2, filter_ip_address/2 @@ -133,9 +134,23 @@ uninstall(HandlerId) -> running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). +-spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. +filter_ruleid(#{meta := Meta = #{rule_id := RuleId}} = Log, {MatchId, _Name}) -> + RuleIDs = maps:get(rule_ids, Meta, #{}), + IsMatch = (RuleId =:= MatchId) orelse maps:get(MatchId, RuleIDs, false), + filter_ret(IsMatch andalso is_trace(Meta), Log); +filter_ruleid(#{meta := Meta = #{rule_ids := RuleIDs}} = Log, {MatchId, _Name}) -> + filter_ret(maps:get(MatchId, RuleIDs, false) andalso is_trace(Meta), Log); +filter_ruleid(_Log, _ExpectId) -> + stop. + -spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) -> - filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log); + ClientIDs = maps:get(client_ids, Meta, #{}), + IsMatch = (ClientId =:= MatchId) orelse maps:get(MatchId, ClientIDs, false), + filter_ret(IsMatch andalso is_trace(Meta), Log); +filter_clientid(#{meta := Meta = #{client_ids := ClientIDs}} = Log, {MatchId, _Name}) -> + filter_ret(maps:get(MatchId, ClientIDs, false) andalso is_trace(Meta), Log); filter_clientid(_Log, _ExpectId) -> stop. @@ -164,7 +179,9 @@ filters(#{type := clientid, filter := Filter, name := Name}) -> filters(#{type := topic, filter := Filter, name := Name}) -> [{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}]; filters(#{type := ip_address, filter := Filter, name := Name}) -> - [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. + [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]; +filters(#{type := ruleid, filter := Filter, name := Name}) -> + [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. formatter(#{type := _Type, payload_encode := PayloadEncode}) -> {emqx_trace_formatter, #{ @@ -184,7 +201,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) [{Type, {FilterFun, {Filter, Name}}}] when Type =:= topic orelse Type =:= clientid orelse - Type =:= ip_address + Type =:= ip_address orelse + Type =:= ruleid -> [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc]; _ -> diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 464b2e429..a8aaf9fdd 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -41,6 +41,9 @@ ]). -export([clean_cache/0]). +%% For tests +-export([hard_coded_test_action_info_modules/0]). + -callback bridge_v1_type_name() -> atom() | { @@ -128,8 +131,13 @@ hard_coded_action_info_modules_common() -> emqx_bridge_mqtt_pubsub_action_info ]. +%% This exists so that it can be mocked for test cases +hard_coded_test_action_info_modules() -> []. + hard_coded_action_info_modules() -> - hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). + hard_coded_action_info_modules_common() ++ + hard_coded_action_info_modules_ee() ++ + ?MODULE:hard_coded_test_action_info_modules(). %% ==================================================================== %% API diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index ae1e727ca..ec75922a7 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -371,6 +371,7 @@ on_query( } ), NRequest = formalize_request(Method, BasePath, Request), + trace_rendered_action_template(InstId, Method, NRequest, Timeout), Worker = resolve_pool_worker(State, KeyOrNum), Result0 = ehttpc:request( Worker, @@ -480,6 +481,7 @@ on_query_async( } ), NRequest = formalize_request(Method, BasePath, Request), + trace_rendered_action_template(InstId, Method, NRequest, Timeout), MaxAttempts = maps:get(max_attempts, State, 3), Context = #{ attempt => 1, @@ -499,6 +501,31 @@ on_query_async( ), {ok, Worker}. +trace_rendered_action_template(InstId, Method, NRequest, Timeout) -> + case NRequest of + {Path, Headers} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => emqx_utils_redact:redact_headers(Headers), + timeout => Timeout + } + ); + {Path, Headers, Body} -> + emqx_trace:rendered_action_template( + InstId, + #{ + path => Path, + method => Method, + headers => emqx_utils_redact:redact_headers(Headers), + timeout => Timeout, + body => Body + } + ) + end. + resolve_pool_worker(State, undefined) -> resolve_pool_worker(State, self()); resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 3da04012d..73f6359ab 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -30,8 +30,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). --define(BRIDGE_TYPE, <<"webhook">>). --define(BRIDGE_NAME, atom_to_binary(?MODULE)). +-define(BRIDGE_TYPE, emqx_bridge_http_test_lib:bridge_type()). +-define(BRIDGE_NAME, emqx_bridge_http_test_lib:bridge_name()). all() -> emqx_common_test_helpers:all(?MODULE). @@ -73,21 +73,10 @@ suite() -> init_per_testcase(t_bad_bridge_config, Config) -> Config; -init_per_testcase(t_send_async_connection_timeout, Config) -> - HTTPPath = <<"/path">>, - ServerSSLOpts = false, - {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( - _Port = random, HTTPPath, ServerSSLOpts - ), - ResponseDelayMS = 500, - ok = emqx_bridge_http_connector_test_server:set_handler( - success_http_handler(#{response_delay => ResponseDelayMS}) - ), - [ - {http_server, #{port => HTTPPort, path => HTTPPath}}, - {response_delay_ms, ResponseDelayMS} - | Config - ]; +init_per_testcase(Case, Config) when + Case =:= t_send_async_connection_timeout orelse Case =:= t_send_get_trace_messages +-> + emqx_bridge_http_test_lib:init_http_success_server(Config); init_per_testcase(t_path_not_found, Config) -> HTTPPath = <<"/nonexisting/path">>, ServerSSLOpts = false, @@ -115,7 +104,9 @@ init_per_testcase(t_bridge_probes_header_atoms, Config) -> {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( _Port = random, HTTPPath, ServerSSLOpts ), - ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()), + ok = emqx_bridge_http_connector_test_server:set_handler( + emqx_bridge_http_test_lib:success_http_handler() + ), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), @@ -126,7 +117,8 @@ end_per_testcase(TestCase, _Config) when TestCase =:= t_too_many_requests; TestCase =:= t_rule_action_expired; TestCase =:= t_bridge_probes_header_atoms; - TestCase =:= t_send_async_connection_timeout + TestCase =:= t_send_async_connection_timeout; + TestCase =:= t_send_get_trace_messages -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), @@ -250,115 +242,8 @@ get_metrics(Name) -> Type = <<"http">>, emqx_bridge:get_metrics(Type, Name). -bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, ?BRIDGE_TYPE), - Name = maps:get(name, Config, ?BRIDGE_NAME), - Host = maps:get(host, Config, "localhost"), - Path = maps:get(path, Config, ""), - PoolSize = maps:get(pool_size, Config, 1), - QueryMode = maps:get(query_mode, Config, "async"), - ConnectTimeout = maps:get(connect_timeout, Config, "1s"), - RequestTimeout = maps:get(request_timeout, Config, "10s"), - ResumeInterval = maps:get(resume_interval, Config, "1s"), - HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"), - ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), - LocalTopic = - case maps:find(local_topic, Config) of - {ok, LT} -> - lists:flatten(["local_topic = \"", LT, "\""]); - error -> - "" - end, - ConfigString = io_lib:format( - "bridges.~s.~s {\n" - " url = \"http://~s:~p~s\"\n" - " connect_timeout = \"~p\"\n" - " enable = true\n" - %% local_topic - " ~s\n" - " enable_pipelining = 100\n" - " max_retries = 2\n" - " method = \"post\"\n" - " pool_size = ~p\n" - " pool_type = \"random\"\n" - " request_timeout = \"~s\"\n" - " body = \"${id}\"\n" - " resource_opts {\n" - " inflight_window = 100\n" - " health_check_interval = \"~s\"\n" - " max_buffer_bytes = \"1GB\"\n" - " query_mode = \"~s\"\n" - " request_ttl = \"~p\"\n" - " resume_interval = \"~s\"\n" - " start_after_created = \"true\"\n" - " start_timeout = \"5s\"\n" - " worker_pool_size = \"1\"\n" - " }\n" - " ssl {\n" - " enable = false\n" - " }\n" - "}\n", - [ - Type, - Name, - Host, - Port, - Path, - ConnectTimeout, - LocalTopic, - PoolSize, - RequestTimeout, - HealthCheckInterval, - QueryMode, - ResourceRequestTTL, - ResumeInterval - ] - ), - ct:pal(ConfigString), - parse_and_check(ConfigString, Type, Name). - -parse_and_check(ConfigString, BridgeType, Name) -> - {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, - RetConfig. - make_bridge(Config) -> - Type = ?BRIDGE_TYPE, - Name = ?BRIDGE_NAME, - BridgeConfig = bridge_async_config(Config#{ - name => Name, - type => Type - }), - {ok, _} = emqx_bridge:create( - Type, - Name, - BridgeConfig - ), - emqx_bridge_resource:bridge_id(Type, Name). - -success_http_handler() -> - success_http_handler(#{response_delay => 0}). - -success_http_handler(Opts) -> - ResponseDelay = maps:get(response_delay, Opts, 0), - TestPid = self(), - fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - Headers = cowboy_req:headers(Req), - ct:pal("http request received: ~p", [ - #{body => Body, headers => Headers, response_delay => ResponseDelay} - ]), - ResponseDelay > 0 andalso timer:sleep(ResponseDelay), - TestPid ! {http, Headers, Body}, - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"text/plain">>}, - <<"hello">>, - Req - ), - {ok, Rep, State} - end. + emqx_bridge_http_test_lib:make_bridge(Config). not_found_http_handler() -> TestPid = self(), @@ -452,6 +337,103 @@ t_send_async_connection_timeout(Config) -> receive_request_notifications(MessageIDs, ResponseDelayMS, []), ok. +t_send_get_trace_messages(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port, path := Path} = ?config(http_server, Config), + BridgeID = make_bridge(#{ + port => Port, + path => Path, + pool_size => 1, + query_mode => "async", + connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms", + request_timeout => "10s", + resume_interval => "200ms", + health_check_interval => "200ms", + resource_request_ttl => "infinity" + }), + RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), + SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_testlib:create_rule_and_action_http( + ?BRIDGE_TYPE, + RuleTopic, + Config, + #{sql => SQL} + ), + %% =================================== + %% Create trace for RuleId + %% =================================== + Now = erlang:system_time(second) - 10, + Start = Now, + End = Now + 60, + TraceName = atom_to_binary(?FUNCTION_NAME), + Trace = #{ + name => TraceName, + type => ruleid, + ruleid => RuleId, + start_at => Start, + end_at => End + }, + emqx_trace_SUITE:reload(), + ok = emqx_trace:clear(), + {ok, _} = emqx_trace:create(Trace), + %% =================================== + + ResourceId = emqx_bridge_resource:resource_id(BridgeID), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertEqual(<<>>, read_rule_trace_file(TraceName, Now)) + ), + Msg = emqx_message:make(RuleTopic, <<"{\"id\": 1}">>), + emqx:publish(Msg), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + 'matched' := 1, + 'actions.failed' := 0, + 'actions.failed.unknown' := 0, + 'actions.success' := 1, + 'actions.total' := 1 + } + }, + emqx_metrics_worker:get_metrics(rule_metrics, RuleId) + ) + ), + + ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, Now)), + + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, Now), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) + end + ), + emqx_trace:delete(TraceName), + ok. + +read_rule_trace_file(TraceName, From) -> + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)), + Bin. + t_async_free_retries(Config) -> #{port := Port} = ?config(http_server, Config), _BridgeID = make_bridge(#{ @@ -518,7 +500,7 @@ t_async_common_retries(Config) -> ok. t_bad_bridge_config(_Config) -> - BridgeConfig = bridge_async_config(#{port => 12345}), + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{port => 12345}), ?assertMatch( {ok, {{_, 201, _}, _Headers, #{ @@ -540,7 +522,7 @@ t_bad_bridge_config(_Config) -> t_start_stop(Config) -> #{port := Port} = ?config(http_server, Config), - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port @@ -554,7 +536,7 @@ t_path_not_found(Config) -> begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, @@ -593,7 +575,7 @@ t_too_many_requests(Config) -> begin #{port := Port, path := Path} = ?config(http_server, Config), MQTTTopic = <<"t/webhook">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, local_topic => MQTTTopic, @@ -633,7 +615,7 @@ t_rule_action_expired(Config) -> ?check_trace( begin RuleTopic = <<"t/webhook/rule">>, - BridgeConfig = bridge_async_config(#{ + BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, host => "non.existent.host", @@ -689,7 +671,7 @@ t_bridge_probes_header_atoms(Config) -> ?check_trace( begin LocalTopic = <<"t/local/topic">>, - BridgeConfig0 = bridge_async_config(#{ + BridgeConfig0 = emqx_bridge_http_test_lib:bridge_async_config(#{ type => ?BRIDGE_TYPE, name => ?BRIDGE_NAME, port => Port, diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl new file mode 100644 index 000000000..4959a24c3 --- /dev/null +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_http_test_lib). + +-export([ + bridge_type/0, + bridge_name/0, + make_bridge/1, + bridge_async_config/1, + init_http_success_server/1, + success_http_handler/0 +]). + +-define(BRIDGE_TYPE, bridge_type()). +-define(BRIDGE_NAME, bridge_name()). + +bridge_type() -> + <<"webhook">>. + +bridge_name() -> + atom_to_binary(?MODULE). + +make_bridge(Config) -> + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), + Host = maps:get(host, Config, "localhost"), + Path = maps:get(path, Config, ""), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, "1s"), + RequestTimeout = maps:get(request_timeout, Config, "10s"), + ResumeInterval = maps:get(resume_interval, Config, "1s"), + HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"), + ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), + LocalTopic = + case maps:find(local_topic, Config) of + {ok, LT} -> + lists:flatten(["local_topic = \"", LT, "\""]); + error -> + "" + end, + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://~s:~p~s\"\n" + " connect_timeout = \"~p\"\n" + " enable = true\n" + %% local_topic + " ~s\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~s\"\n" + " body = \"${id}\"\n" + " resource_opts {\n" + " inflight_window = 100\n" + " health_check_interval = \"~s\"\n" + " max_buffer_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_ttl = \"~p\"\n" + " resume_interval = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Host, + Port, + Path, + ConnectTimeout, + LocalTopic, + PoolSize, + RequestTimeout, + HealthCheckInterval, + QueryMode, + ResourceRequestTTL, + ResumeInterval + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +success_http_handler() -> + success_http_handler(#{response_delay => 0}). + +success_http_handler(Opts) -> + ResponseDelay = maps:get(response_delay, Opts, 0), + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + Headers = cowboy_req:headers(Req), + ct:pal("http request received: ~p", [ + #{body => Body, headers => Headers, response_delay => ResponseDelay} + ]), + ResponseDelay > 0 andalso timer:sleep(ResponseDelay), + TestPid ! {http, Headers, Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"text/plain">>}, + <<"hello">>, + Req + ), + {ok, Rep, State} + end. + +init_http_success_server(Config) -> + HTTPPath = <<"/path">>, + ServerSSLOpts = false, + {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link( + _Port = random, HTTPPath, ServerSSLOpts + ), + ResponseDelayMS = 500, + ok = emqx_bridge_http_connector_test_server:set_handler( + success_http_handler(#{response_delay => ResponseDelayMS}) + ), + [ + {http_server, #{port => HTTPPort, path => HTTPPath}}, + {response_delay_ms, ResponseDelayMS}, + {bridge_name, ?BRIDGE_NAME} + | Config + ]. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl index 47df47976..2110a0520 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl @@ -52,7 +52,7 @@ init_per_group(_Group, Config) -> common_init_per_group(Opts) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), ok = emqx_common_test_helpers:start_apps([ - emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine + emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine, emqx_modules ]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), @@ -116,7 +116,9 @@ end_per_group(_Group, Config) -> } = get_channel_connection(Config), amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}), emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]), + ok = emqx_common_test_helpers:stop_apps([ + emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine, emqx_modules + ]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), diff --git a/apps/emqx_connector/src/emqx_connector_info.erl b/apps/emqx_connector/src/emqx_connector_info.erl index 766f34168..e87c2ad7e 100644 --- a/apps/emqx_connector/src/emqx_connector_info.erl +++ b/apps/emqx_connector/src/emqx_connector_info.erl @@ -31,6 +31,9 @@ -export([clean_cache/0]). +%% For tests +-export([hard_coded_test_connector_info_modules/0]). + %% The type name for the conncector -callback type_name() -> atom(). @@ -117,8 +120,13 @@ hard_coded_connector_info_modules_common() -> emqx_bridge_mqtt_pubsub_connector_info ]. +%% This exists so that it can be mocked for test cases +hard_coded_test_connector_info_modules() -> []. + hard_coded_connector_info_modules() -> - hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee(). + hard_coded_connector_info_modules_common() ++ + hard_coded_connector_info_modules_ee() ++ + ?MODULE:hard_coded_test_connector_info_modules(). %% -------------------------------------------------------------------- %% Atom macros to avoid typos diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 5cdbc65ff..19edc229d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -222,7 +222,7 @@ fields(trace) -> )}, {type, hoconsc:mk( - hoconsc:enum([clientid, topic, ip_address]), + hoconsc:enum([clientid, topic, ip_address, ruleid]), #{ description => ?DESC(filter_type), required => true, @@ -257,6 +257,15 @@ fields(trace) -> example => <<"127.0.0.1">> } )}, + {ruleid, + hoconsc:mk( + binary(), + #{ + description => ?DESC(ruleid_field), + required => false, + example => <<"my_rule">> + } + )}, {status, hoconsc:mk( hoconsc:enum([running, stopped, waiting]), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 989d388fc..465989f3d 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -535,7 +535,9 @@ trace(_) -> {"trace stop topic ", "Stop tracing for a topic on local node"}, {"trace start ip_address [] ", "Traces for a client ip on local node"}, - {"trace stop ip_address ", "Stop tracing for a client ip on local node"} + {"trace stop ip_address ", "Stop tracing for a client ip on local node"}, + {"trace start ruleid [] ", "Traces for a rule ID on local node"}, + {"trace stop ruleid ", "Stop tracing for a rule ID on local node"} ]). trace_on(Name, Type, Filter, Level, LogFile) -> @@ -606,6 +608,7 @@ traces(_) -> "and will end after seconds. The default value for is " ?DEFAULT_TRACE_DURATION " seconds."}, + {"traces start ruleid []", "Traces for a rule ID in cluster"}, {"traces stop ", "Stop trace in cluster"}, {"traces delete ", "Delete trace in cluster"} ]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index cb93bc9d6..ef7b5a191 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -122,6 +122,56 @@ t_http_test(_Config) -> unload(), ok. +t_http_test_rule_trace(_Config) -> + emqx_trace:clear(), + load(), + %% create + Name = atom_to_binary(?FUNCTION_NAME), + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"ruleid">>}, + {<<"ruleid">>, Name} + ], + + {ok, Create} = request_api(post, api_path("trace"), Trace), + ?assertMatch(#{<<"name">> := Name}, json(Create)), + + {ok, List} = request_api(get, api_path("trace")), + [Data] = json(List), + ?assertEqual(Name, maps:get(<<"name">>, Data)), + + %% update + {ok, Update} = request_api(put, api_path(iolist_to_binary(["trace/", Name, "/stop"])), #{}), + ?assertEqual( + #{ + <<"enable">> => false, + <<"name">> => Name + }, + json(Update) + ), + {ok, List1} = request_api(get, api_path("trace")), + [Data1] = json(List1), + Node = atom_to_binary(node()), + ?assertMatch( + #{ + <<"status">> := <<"stopped">>, + <<"name">> := Name, + <<"log_size">> := #{Node := _}, + <<"start_at">> := _, + <<"end_at">> := _, + <<"type">> := <<"ruleid">>, + <<"ruleid">> := Name + }, + Data1 + ), + + %% delete + {ok, Delete} = request_api(delete, api_path(["trace/", Name])), + ?assertEqual(<<>>, Delete), + + unload(), + ok. + t_create_failed(_Config) -> load(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], @@ -252,13 +302,16 @@ t_log_file(_Config) -> ok. create_trace(Name, ClientId, Start) -> + create_trace(Name, clientid, ClientId, Start). + +create_trace(Name, Type, TypeValue, Start) -> ?check_trace( #{timetrap => 900}, begin {ok, _} = emqx_trace:create([ {<<"name">>, Name}, - {<<"type">>, clientid}, - {<<"clientid">>, ClientId}, + {<<"type">>, Type}, + {atom_to_binary(Type), TypeValue}, {<<"start_at">>, Start} ]), ?block_until(#{?snk_kind := update_trace_done}) @@ -268,6 +321,16 @@ create_trace(Name, ClientId, Start) -> end ). +create_rule_trace(RuleId) -> + Now = erlang:system_time(second), + emqx_mgmt_api_trace_SUITE:create_trace(atom_to_binary(?FUNCTION_NAME), ruleid, RuleId, Now - 2). + +t_create_rule_trace(_Config) -> + load(), + create_rule_trace(atom_to_binary(?FUNCTION_NAME)), + unload(), + ok. + t_stream_log(_Config) -> emqx_trace:clear(), load(), diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 37f9369ff..bc1aea734 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -64,8 +64,10 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). --define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). --define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)). +-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX), + {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} +). +-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} @@ -77,7 +79,10 @@ -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). --type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). +-type trace_context() :: map() | undefined. +-type queue_query() :: ?QUERY( + reply_fun(), request(), HasBeenSent :: boolean(), expire_at(), TraceCtx :: trace_context() +). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). -type timeout_ms() :: emqx_schema:timeout_duration_ms(). @@ -154,7 +159,10 @@ simple_sync_query(Id, Request, QueryOpts0) -> emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), - Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts), + TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined), + Result = call_query( + force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts + ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -167,8 +175,9 @@ simple_async_query(Id, Request, QueryOpts0) -> emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined), Result = call_query( - async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts + async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -439,10 +448,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), {ShouldAck, PostFn, DeltaCounters} = case QueryOrBatch of - ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) -> + ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) -> Reply = ?REPLY(ReplyTo, HasBeenSent, Result), reply_caller_defer_metrics(Id, Reply, QueryOpts); - [?QUERY(_, _, _, _) | _] = Batch -> + [?QUERY(_, _, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, Data1 = aggregate_counters(Data0, DeltaCounters), @@ -501,11 +510,13 @@ collect_and_enqueue_query_requests(Request0, Data0) -> ReplyFun = maps:get(async_reply_fun, Opts, undefined), HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), - ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt); + TraceCtx = maps:get(trace_ctx, Opts, undefined), + ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt, TraceCtx); (?SEND_REQ(ReplyTo, {query, Req, Opts})) -> HasBeenSent = false, ExpireAt = maps:get(expire_at, Opts), - ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt) + TraceCtx = maps:get(trace_ctx, Opts, undefined), + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx) end, Requests ), @@ -515,7 +526,7 @@ collect_and_enqueue_query_requests(Request0, Data0) -> reply_overflown([]) -> ok; -reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) -> +reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) | More]) -> do_reply_caller(ReplyTo, {error, buffer_overflow}), reply_overflown(More). @@ -572,7 +583,11 @@ flush(Data0) -> {keep_state, Data1}; {_, false} -> ?tp(buffer_worker_flush_before_pop, #{}), - {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), + PopOpts = #{ + count_limit => BatchSize, + stop_before => {fun stop_batching/2, initial_state} + }, + {Q1, QAckRef, Batch} = replayq:pop(Q0, PopOpts), Data2 = Data1#{queue := Q1}, ?tp(buffer_worker_flush_before_sieve_expired, #{}), Now = now_(), @@ -608,6 +623,23 @@ flush(Data0) -> end end. +stop_batching(Query, initial_state) -> + get_stop_flag(Query); +stop_batching(Query, PrevStopFlag) -> + case get_stop_flag(Query) =:= PrevStopFlag of + true -> + PrevStopFlag; + false -> + %% We stop beceause we don't want a batch with mixed values for the + %% stop_action_after_render option + true + end. + +get_stop_flag(?QUERY(_, _, _, _, #{stop_action_after_render := true})) -> + stop_action_after_render; +get_stop_flag(_) -> + no_stop_action_after_render. + -spec do_flush(data(), #{ is_batch := boolean(), batch := [queue_query()], @@ -630,7 +662,7 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch, + [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(ReplyTo, HasBeenSent, Result), @@ -824,14 +856,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) -> lists:map( - fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) -> + fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) -> ?REPLY(FROM, SENT, Result) end, lists:zip(Batch, BatchResults) ); expand_batch_reply(BatchResult, Batch) -> lists:map( - fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) -> + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) -> ?REPLY(FROM, SENT, BatchResult) end, Batch @@ -880,7 +912,7 @@ reply_dropped(_ReplyTo, _Result) -> -spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok. batch_reply_dropped(Batch, Result) -> lists:foreach( - fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) -> + fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt, _TraceCtx)) -> reply_dropped(ReplyTo, Result) end, Batch @@ -1093,11 +1125,53 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> - do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); + set_rule_id_trace_meta_data(Query), + QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource), + %% do_call_query does not throw an exception as the call to the + %% resource is wrapped in a try catch expression so we will always + %% unset the trace meta data + unset_rule_id_trace_meta_data(), + QueryResult; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. +set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> + %% Get the rule ids from requests + RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), + ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), + StopAfterRenderVal = + case Requests of + %% We know that the batch is not mixed since we prevent this by + %% using a stop_after function in the replayq:pop call + [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] -> + true; + [?QUERY(_, _, _, _, _TraceCTX) | _] -> + false + end, + logger:update_process_metadata(#{ + rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal + }), + ok; +set_rule_id_trace_meta_data(Request) -> + set_rule_id_trace_meta_data([Request]), + ok. + +collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) -> + Acc#{RuleId => true}; +collect_rule_id(?QUERY(_, _, _, _, _), Acc) -> + Acc. + +collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> + Acc#{ClientId => true}; +collect_client_id(?QUERY(_, _, _, _, _), Acc) -> + Acc. + +unset_rule_id_trace_meta_data() -> + logger:update_process_metadata(#{ + rule_ids => #{}, client_ids => #{}, stop_action_after_render => false + }). + %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> case binary:split(Id, <<":">>, [global]) of @@ -1208,7 +1282,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> ). apply_query_fun( - sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts + sync, + Mod, + Id, + _Index, + _Ref, + ?QUERY(_, Request, _, _, _TraceCtx) = _Query, + ResSt, + Channels, + QueryOpts ) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), maybe_reply_to( @@ -1227,7 +1309,15 @@ apply_query_fun( QueryOpts ); apply_query_fun( - async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts + async, + Mod, + Id, + Index, + Ref, + ?QUERY(_, Request, _, _, _TraceCtx) = Query, + ResSt, + Channels, + QueryOpts ) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1268,7 +1358,7 @@ apply_query_fun( Id, _Index, _Ref, - [?QUERY(_, FirstRequest, _, _) | _] = Batch, + [?QUERY(_, FirstRequest, _, _, _) | _] = Batch, ResSt, Channels, QueryOpts @@ -1276,7 +1366,9 @@ apply_query_fun( ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), + Requests = lists:map( + fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch + ), maybe_reply_to( ?APPLY_RESOURCE( call_batch_query, @@ -1298,7 +1390,7 @@ apply_query_fun( Id, Index, Ref, - [?QUERY(_, FirstRequest, _, _) | _] = Batch, + [?QUERY(_, FirstRequest, _, _, _) | _] = Batch, ResSt, Channels, QueryOpts @@ -1321,7 +1413,7 @@ apply_query_fun( min_batch => minimize(Batch) }, Requests = lists:map( - fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch + fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch ), IsRetriable = false, AsyncWorkerMRef = undefined, @@ -1367,7 +1459,7 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, buffer_worker := BufferWorkerPid, - min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, _, ExpireAt, _TraceCtx) = _Query } = ReplyContext, Result ) -> @@ -1399,7 +1491,7 @@ do_handle_async_reply( request_ref := Ref, buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, - min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query }, Result ) -> @@ -1486,13 +1578,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) -> %% So we just take the original flag from the ReplyContext batch %% and put it back to the batch found in inflight table %% which must have already been set to `false` - [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) | _] = Batch, {RealNotExpired0, RealExpired, Results} = sieve_expired_requests_with_results(RealBatch, Now, Results0), RealNotExpired = lists:map( - fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> - ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt, TraceCtx)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt, TraceCtx) end, RealNotExpired0 ), @@ -1678,7 +1770,10 @@ inflight_get_first_retriable(InflightTID, Now) -> case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> none; - {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} -> + { + [{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)}], + _Continuation + } -> case is_expired(ExpireAt, Now) of true -> {expired, Ref, [Query]}; @@ -1714,7 +1809,7 @@ inflight_append(undefined, _InflightItem) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef) ) -> Batch = mark_as_sent(Batch0), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), @@ -1726,7 +1821,10 @@ inflight_append( inflight_append( InflightTID, ?INFLIGHT_ITEM( - Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef + Ref, + ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) = Query0, + IsRetriable, + AsyncWorkerMRef ) ) -> Query = mark_as_sent(Query0), @@ -1790,9 +1888,13 @@ ack_inflight(undefined, _Ref, _BufferWorkerPid) -> ack_inflight(InflightTID, Ref, BufferWorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _, _), _IsRetriable, _AsyncWorkerMRef)] -> {1, true}; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] -> + [ + ?INFLIGHT_ITEM( + Ref, [?QUERY(_, _, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef + ) + ] -> {length(Batch), true}; [] -> {0, false} @@ -1942,9 +2044,9 @@ do_collect_requests(Acc, Count, Limit) -> mark_as_sent(Batch) when is_list(Batch) -> lists:map(fun mark_as_sent/1, Batch); -mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) -> +mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt, TraceCtx)) -> HasBeenSent = true, - ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt). + ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx). is_unrecoverable_error({error, {unrecoverable_error, _}}) -> true; @@ -1967,7 +2069,7 @@ is_async_return(_) -> sieve_expired_requests(Batch, Now) -> lists:partition( - fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)) -> not is_expired(ExpireAt, Now) end, Batch @@ -1978,7 +2080,7 @@ sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) - {RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} = lists:foldl( fun( - {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result}, + {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx) = Query, Result}, {NotExpAcc, ResAcc, ExpAcc} ) -> case not is_expired(ExpireAt, Now) of @@ -2026,15 +2128,16 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> Opts#{expire_at => ExpireAt}. %% no need to keep the request for async reply handler -minimize(?QUERY(_, _, _, _) = Q) -> +minimize(?QUERY(_, _, _, _, _) = Q) -> do_minimize(Q); minimize(L) when is_list(L) -> lists:map(fun do_minimize/1, L). -ifdef(TEST). -do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. +do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt, _TraceCtx) = Query) -> Query. -else. -do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). +do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt, TraceCtx)) -> + ?QUERY(ReplyTo, [], Sent, ExpireAt, TraceCtx). -endif. %% To avoid message loss due to misconfigurations, we adjust diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 99e85424d..d018ce709 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -277,10 +277,9 @@ t_batch_query_counter(_) -> fun(Result, Trace) -> ?assertMatch({ok, 0}, Result), QueryTrace = ?of_kind(call_batch_query, Trace), - ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace) + ?assertMatch([#{batch := [{query, _, get_counter, _, _, _}]}], QueryTrace) end ), - NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, @@ -340,7 +339,7 @@ t_query_counter_async_query(_) -> fun(Trace) -> %% the callback_mode of 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace) end ), %% simple query ignores the query_mode and batching settings in the resource_worker @@ -351,7 +350,7 @@ t_query_counter_async_query(_) -> ?assertMatch({ok, 1000}, Result), %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace) end ), #{counters := C} = emqx_resource:get_metrics(?ID), @@ -397,7 +396,7 @@ t_query_counter_async_callback(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace) end ), @@ -408,7 +407,7 @@ t_query_counter_async_callback(_) -> fun(Result, Trace) -> ?assertMatch({ok, 1000}, Result), QueryTrace = ?of_kind(call_query, Trace), - ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) + ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace) end ), #{counters := C} = emqx_resource:get_metrics(?ID), @@ -480,7 +479,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace) end ), tap_metrics(?LINE), @@ -537,7 +536,7 @@ t_query_counter_async_inflight(_) -> end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), + ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _, _}} | _], QueryTrace), ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok @@ -557,7 +556,7 @@ t_query_counter_async_inflight(_) -> ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace) end ), @@ -669,8 +668,8 @@ t_query_counter_async_inflight_batch(_) -> || Event = #{ ?snk_kind := call_batch_query_async, batch := [ - {query, _, {inc_counter, 1}, _, _}, - {query, _, {inc_counter, 1}, _, _} + {query, _, {inc_counter, 1}, _, _, _}, + {query, _, {inc_counter, 1}, _, _, _} ] } <- Trace @@ -754,7 +753,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _], QueryTrace ) end @@ -779,7 +778,7 @@ t_query_counter_async_inflight_batch(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( - [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _], + [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _], QueryTrace ) end @@ -2051,7 +2050,7 @@ do_t_expiration_before_sending(QueryMode) -> end, fun(Trace) -> ?assertMatch( - [#{batch := [{query, _, {inc_counter, 99}, _, _}]}], + [#{batch := [{query, _, {inc_counter, 99}, _, _, _}]}], ?of_kind(buffer_worker_flush_all_expired, Trace) ), Metrics = tap_metrics(?LINE), @@ -2167,7 +2166,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) -> #{ ?snk_kind := handle_async_reply, action := ack, - batch_or_query := [{query, _, {inc_counter, 99}, _, _}] + batch_or_query := [{query, _, {inc_counter, 99}, _, _, _}] }, 10 * TimeoutMS ); @@ -2189,8 +2188,8 @@ do_t_expiration_before_sending_partial_batch(QueryMode) -> ?assertMatch( [ #{ - expired := [{query, _, {inc_counter, 199}, _, _}], - not_expired := [{query, _, {inc_counter, 99}, _, _}] + expired := [{query, _, {inc_counter, 199}, _, _, _}], + not_expired := [{query, _, {inc_counter, 99}, _, _, _}] } ], ?of_kind(buffer_worker_flush_potentially_partial, Trace) @@ -2303,7 +2302,7 @@ do_t_expiration_async_after_reply(IsBatch) -> #{?snk_kind := delay}, #{ ?snk_kind := handle_async_reply_enter, - batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _] } ), @@ -2346,8 +2345,8 @@ do_t_expiration_async_after_reply(IsBatch) -> [ #{ expired := [ - {query, _, {inc_counter, 199}, _, _}, - {query, _, {inc_counter, 299}, _, _} + {query, _, {inc_counter, 199}, _, _, _}, + {query, _, {inc_counter, 299}, _, _, _} ] } ], @@ -2365,8 +2364,8 @@ do_t_expiration_async_after_reply(IsBatch) -> single -> ?assertMatch( [ - #{expired := [{query, _, {inc_counter, 199}, _, _}]}, - #{expired := [{query, _, {inc_counter, 299}, _, _}]} + #{expired := [{query, _, {inc_counter, 199}, _, _, _}]}, + #{expired := [{query, _, {inc_counter, 299}, _, _, _}]} ], ?of_kind(handle_async_reply_expired, Trace) ) @@ -2417,7 +2416,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> #{?snk_kind := delay}, #{ ?snk_kind := handle_async_reply_enter, - batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] + batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _] } ), @@ -2451,8 +2450,8 @@ t_expiration_batch_all_expired_after_reply(_Config) -> [ #{ expired := [ - {query, _, {inc_counter, 199}, _, _}, - {query, _, {inc_counter, 299}, _, _} + {query, _, {inc_counter, 199}, _, _, _}, + {query, _, {inc_counter, 299}, _, _, _} ] } ], @@ -2578,7 +2577,7 @@ do_t_expiration_retry() -> end, fun(Trace) -> ?assertMatch( - [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], + [#{expired := [{query, _, {inc_counter, 1}, _, _, _}]}], ?of_kind(buffer_worker_retry_expired, Trace) ), Metrics = tap_metrics(?LINE), @@ -2655,8 +2654,8 @@ t_expiration_retry_batch_multiple_times(_Config) -> fun(Trace) -> ?assertMatch( [ - #{expired := [{query, _, {inc_counter, 1}, _, _}]}, - #{expired := [{query, _, {inc_counter, 2}, _, _}]} + #{expired := [{query, _, {inc_counter, 1}, _, _, _}]}, + #{expired := [{query, _, {inc_counter, 2}, _, _, _}]} ], ?of_kind(buffer_worker_retry_expired, Trace) ), diff --git a/apps/emqx_rule_engine/rebar.config b/apps/emqx_rule_engine/rebar.config index 07c53d3e3..0f00f15c6 100644 --- a/apps/emqx_rule_engine/rebar.config +++ b/apps/emqx_rule_engine/rebar.config @@ -2,7 +2,16 @@ {deps, [ {emqx, {path, "../emqx"}}, - {emqx_utils, {path, "../emqx_utils"}} + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_modules, {path, "../emqx_modules"}} +]}. + +{profiles, [ + {test, [ + {deps, [ + {emqx_bridge_http, {path, "../emqx_bridge_http"}} + ]} + ]} ]}. {erl_opts, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index d82951124..f4685222c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -26,7 +26,7 @@ -export([namespace/0, roots/0, fields/1]). --type tag() :: rule_creation | rule_test | rule_engine. +-type tag() :: rule_creation | rule_test | rule_engine | rule_apply_test. -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}. check_params(Params, Tag) -> @@ -54,7 +54,8 @@ roots() -> {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})}, {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})}, {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})}, - {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})} + {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}, + {"rule_apply_test", sc(ref("rule_apply_test"), #{desc => ?DESC("root_apply_rule_test")})} ]. fields("rule_engine") -> @@ -101,29 +102,22 @@ fields("rule_events") -> ]; fields("rule_test") -> [ - {"context", - sc( - hoconsc:union([ - ref("ctx_pub"), - ref("ctx_sub"), - ref("ctx_unsub"), - ref("ctx_delivered"), - ref("ctx_acked"), - ref("ctx_dropped"), - ref("ctx_connected"), - ref("ctx_disconnected"), - ref("ctx_connack"), - ref("ctx_check_authz_complete"), - ref("ctx_bridge_mqtt"), - ref("ctx_delivery_dropped") - ]), - #{ - desc => ?DESC("test_context"), - default => #{} - } - )}, + rule_input_message_context(), {"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})} ]; +fields("rule_apply_test") -> + [ + rule_input_message_context(), + {"stop_action_after_template_rendering", + sc( + typerefl:boolean(), + #{ + desc => + ?DESC("stop_action_after_template_render"), + default => true + } + )} + ]; fields("metrics") -> [ {"matched", @@ -315,6 +309,29 @@ fields("ctx_delivery_dropped") -> | msg_event_common_fields() ]. +rule_input_message_context() -> + {"context", + sc( + hoconsc:union([ + ref("ctx_pub"), + ref("ctx_sub"), + ref("ctx_unsub"), + ref("ctx_delivered"), + ref("ctx_acked"), + ref("ctx_dropped"), + ref("ctx_connected"), + ref("ctx_disconnected"), + ref("ctx_connack"), + ref("ctx_check_authz_complete"), + ref("ctx_bridge_mqtt"), + ref("ctx_delivery_dropped") + ]), + #{ + desc => ?DESC("test_context"), + default => #{} + } + )}. + qos() -> {"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1768141ae..1fed922dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -17,7 +17,9 @@ %% rule_engine should wait for bridge connector start, %% it's will check action/connector ref's exist. emqx_bridge, - emqx_connector + emqx_connector, + %% Needed to start the tracing functionality + emqx_modules ]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 354e40c5f..d203dd915 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -37,6 +37,7 @@ '/rule_test'/2, '/rules'/2, '/rules/:id'/2, + '/rules/:id/test'/2, '/rules/:id/metrics'/2, '/rules/:id/metrics/reset'/2 ]). @@ -145,6 +146,7 @@ paths() -> "/rule_test", "/rules", "/rules/:id", + "/rules/:id/test", "/rules/:id/metrics", "/rules/:id/metrics/reset" ]. @@ -161,6 +163,9 @@ rule_creation_schema() -> rule_test_schema() -> ref(emqx_rule_api_schema, "rule_test"). +rule_apply_test_schema() -> + ref(emqx_rule_api_schema, "rule_apply_test"). + rule_info_schema() -> ref(emqx_rule_api_schema, "rule_info"). @@ -258,6 +263,21 @@ schema("/rules/:id") -> } } }; +schema("/rules/:id/test") -> + #{ + 'operationId' => '/rules/:id/test', + post => #{ + tags => [<<"rules">>], + description => ?DESC("api11"), + summary => <<"Apply a rule for testing">>, + 'requestBody' => rule_apply_test_schema(), + responses => #{ + 400 => error_schema('BAD_REQUEST', "Invalid Parameters"), + 412 => error_schema('NOT_MATCH', "SQL Not Match"), + 200 => <<"Rule Applied">> + } + } + }; schema("/rules/:id/metrics") -> #{ 'operationId' => '/rules/:id/metrics', @@ -392,6 +412,24 @@ param_path_id() -> end ). +'/rules/:id/test'(post, #{body := Params, bindings := #{id := RuleId}}) -> + ?CHECK_PARAMS( + Params, + rule_apply_test, + begin + case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of + {ok, Result} -> + {200, Result}; + {error, {parse_error, Reason}} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; + {error, nomatch} -> + {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end + end + ). + '/rules/:id'(get, #{bindings := #{id := Id}}) -> case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f51908772..3dfc5f6c8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -69,6 +69,14 @@ apply_rule_discard_result(Rule, Columns, Envs) -> ok. apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> + set_process_trace_metadata(RuleID, Columns), + trace_rule_sql( + "rule_activated", + #{ + input => Columns, environment => Envs + }, + debug + ), ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'), clear_rule_payload(), try @@ -77,48 +85,80 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> %% ignore the errors if select or match failed _:Reason = {select_and_transform_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "SELECT_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "SELECT_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {match_conditions_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "WHERE_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "WHERE_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {select_and_collect_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "FOREACH_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "FOREACH_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; _:Reason = {match_incase_error, Error} -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(warning, #{ - msg => "INCASE_clause_exception", - rule_id => RuleID, - reason => Error - }), + trace_rule_sql( + "INCASE_clause_exception", + #{ + reason => Error + }, + warning + ), {error, Reason}; Class:Error:StkTrace -> ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'), - ?SLOG(error, #{ - msg => "apply_rule_failed", - rule_id => RuleID, - exception => Class, - reason => Error, - stacktrace => StkTrace - }), + trace_rule_sql( + "apply_rule_failed", + #{ + exception => Class, + reason => Error, + stacktrace => StkTrace + }, + warning + ), {error, {Error, StkTrace}} + after + reset_process_trace_metadata(Columns) end. +set_process_trace_metadata(RuleID, #{clientid := ClientID}) -> + logger:update_process_metadata(#{ + rule_id => RuleID, + clientid => ClientID + }); +set_process_trace_metadata(RuleID, _) -> + logger:update_process_metadata(#{ + rule_id => RuleID + }). + +reset_process_trace_metadata(#{clientid := _ClientID}) -> + Meta = logger:get_process_metadata(), + Meta1 = maps:remove(clientid, Meta), + Meta2 = maps:remove(rule_id, Meta1), + logger:set_process_metadata(Meta2); +reset_process_trace_metadata(_) -> + Meta = logger:get_process_metadata(), + Meta1 = maps:remove(rule_id, Meta), + logger:set_process_metadata(Meta1). + do_apply_rule( #{ id := RuleId, @@ -136,13 +176,18 @@ do_apply_rule( {ok, ColumnsAndSelected, FinalCollection} -> case FinalCollection of [] -> + trace_rule_sql("FOREACH_yielded_no_result"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> + trace_rule_sql( + "FOREACH_yielded_result", #{result => FinalCollection}, debug + ), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, NewEnvs = maps:merge(ColumnsAndSelected, Envs), {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]}; false -> + trace_rule_sql("FOREACH_yielded_no_result_no_match"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end; @@ -159,9 +204,11 @@ do_apply_rule( ) -> case evaluate_select(Fields, Columns, Conditions) of {ok, Selected} -> + trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; false -> + trace_rule_sql("SELECT_yielded_no_result_no_match"), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} end. @@ -345,37 +392,42 @@ handle_action_list(RuleId, Actions, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), + trace_action(ActId, "activating_action"), try - do_handle_action(RuleId, ActId, Selected, Envs) + Result = do_handle_action(RuleId, ActId, Selected, Envs), + trace_action(ActId, "action_activated", #{result => Result}), + Result catch throw:out_of_service -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc( rule_metrics, RuleId, 'actions.failed.out_of_service' ), - ?SLOG(warning, #{msg => "out_of_service", action => ActId}); + trace_action(ActId, "out_of_service", #{}, warning); Err:Reason:ST -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'), - ?SLOG(error, #{ - msg => "action_failed", - action => ActId, - exception => Err, - reason => Reason, - stacktrace => ST - }) + trace_action( + ActId, + "action_failed", + #{ + exception => Err, + reason => Reason, + stacktrace => ST + }, + error + ) end. -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target). -do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> - ?TRACE( - "BRIDGE", - "bridge_action", - #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} - ), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}}, +do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) -> + trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), + {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}}, case - emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo}) + emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{ + reply_to => ReplyTo, trace_ctx => TraceCtx + }) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); @@ -386,22 +438,19 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env end; do_handle_action( RuleId, - {bridge_v2, BridgeType, BridgeName}, + {bridge_v2, BridgeType, BridgeName} = Action, Selected, _Envs ) -> - ?TRACE( - "BRIDGE", - "bridge_action", - #{bridge_id => {bridge_v2, BridgeType, BridgeName}} - ), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}}, + trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug), + {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}}, case emqx_bridge_v2:send_message( BridgeType, BridgeName, Selected, - #{reply_to => ReplyTo} + #{reply_to => ReplyTo, trace_ctx => TraceCtx} ) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> @@ -412,12 +461,72 @@ do_handle_action( Result end; do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -> + trace_action(Action, "call_action_function"), %% the function can also throw 'out_of_service' Args = maps:get(args, Action, []), Result = Mod:Func(Selected, Envs, Args), - inc_action_metrics(RuleId, Result), + {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action), + inc_action_metrics(IncCtx, Result), + trace_action(Action, "call_action_function_result", #{result => Result}, debug), Result. +do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> + case {emqx_trace:list(), logger:get_process_metadata()} of + {[], #{stop_action_after_render := true}} -> + %% Even if there is no trace we still need to pass + %% stop_action_after_render in the trace meta data so that the + %% action will be stopped. + { + #{ + stop_action_after_render => true + }, + #{ + rule_id => RuleID, + action_id => Action + } + }; + {[], _} -> + %% As a performance/memory optimization, we don't create any trace + %% context if there are no trace patterns. + {undefined, #{ + rule_id => RuleID, + action_id => Action + }}; + {_List, TraceMeta} -> + Ctx = do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta), + {maps:remove(action_id, Ctx), Ctx} + end. + +do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) -> + StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false), + case TraceMeta of + #{ + rule_id := RuleID, + clientid := ClientID + } -> + #{ + rule_id => RuleID, + clientid => ClientID, + action_id => Action, + stop_action_after_render => StopAfterRender + }; + #{ + rule_id := RuleID + } -> + #{ + rule_id => RuleID, + action_id => Action, + stop_action_after_render => StopAfterRender + } + end. + +action_info({bridge, BridgeType, BridgeName, _ResId}) -> + #{type => BridgeType, name => BridgeName}; +action_info({bridge_v2, BridgeType, BridgeName}) -> + #{type => BridgeType, name => BridgeName}; +action_info(FuncInfoMap) -> + FuncInfoMap. + eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) -> case Context of [Columns] -> @@ -596,21 +705,46 @@ nested_put(Alias, Val, Columns0) -> Columns = ensure_decoded_payload(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). -inc_action_metrics(RuleId, Result) -> - _ = do_inc_action_metrics(RuleId, Result), +inc_action_metrics(TraceCtx, Result) -> + _ = do_inc_action_metrics(TraceCtx, Result), Result. -do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) -> +do_inc_action_metrics( + #{rule_id := RuleId, action_id := ActId} = TraceContext, + {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason} +) -> + TraceContext1 = maps:remove(action_id, TraceContext), + trace_action( + ActId, + "action_stopped_after_template_rendering", + maps:merge(#{reason => Explanation}, TraceContext1) + ), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); -do_inc_action_metrics(RuleId, R) -> +do_inc_action_metrics( + #{rule_id := RuleId, action_id := ActId} = TraceContext, + {error, {recoverable_error, _}} +) -> + TraceContext1 = maps:remove(action_id, TraceContext), + trace_action(ActId, "out_of_service", TraceContext1), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); +do_inc_action_metrics( + #{rule_id := RuleId, action_id := ActId} = TraceContext, + {error, {unrecoverable_error, _} = Reason} +) -> + TraceContext1 = maps:remove(action_id, TraceContext), + trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext1)), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); +do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) -> + TraceContext1 = maps:remove(action_id, TraceContext), case is_ok_result(R) of false -> + trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext1)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); true -> + trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext1)), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') end. @@ -658,3 +792,39 @@ parse_function_name(Module, Name) when is_binary(Name) -> end; parse_function_name(_Module, Name) when is_atom(Name) -> Name. + +trace_action(ActId, Message) -> + trace_action_bridge("ACTION", ActId, Message). + +trace_action(ActId, Message, Extra) -> + trace_action_bridge("ACTION", ActId, Message, Extra, debug). + +trace_action(ActId, Message, Extra, Level) -> + trace_action_bridge("ACTION", ActId, Message, Extra, Level). + +trace_action_bridge(Tag, ActId, Message) -> + trace_action_bridge(Tag, ActId, Message, #{}, debug). + +trace_action_bridge(Tag, ActId, Message, Extra, Level) -> + ?TRACE( + Level, + Tag, + Message, + maps:merge( + #{ + action_info => action_info(ActId) + }, + Extra + ) + ). + +trace_rule_sql(Message) -> + trace_rule_sql(Message, #{}, debug). + +trace_rule_sql(Message, Extra, Level) -> + ?TRACE( + Level, + "RULE_SQL_EXEC", + Message, + Extra + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 8212e3385..83f29eef3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -20,9 +20,73 @@ test/1, get_selected_data/3, %% Some SQL functions return different results in the test environment - is_test_runtime_env/0 + is_test_runtime_env/0, + apply_rule/2 ]). +apply_rule( + RuleId, + #{ + context := Context, + stop_action_after_template_rendering := StopAfterRender + } +) -> + {ok, Rule} = emqx_rule_engine:get_rule(RuleId), + InTopic = get_in_topic(Context), + EventTopics = maps:get(from, Rule, []), + case lists:all(fun is_publish_topic/1, EventTopics) of + true -> + %% test if the topic matches the topic filters in the rule + case emqx_topic:match_any(InTopic, EventTopics) of + true -> + do_apply_matched_rule( + Rule, + Context, + StopAfterRender + ); + false -> + {error, nomatch} + end; + false -> + case lists:member(InTopic, EventTopics) of + true -> + %% the rule is for both publish and events, test it directly + do_apply_matched_rule(Rule, Context, StopAfterRender); + false -> + {error, nomatch} + end + end. + +do_apply_matched_rule(Rule, Context, StopAfterRender) -> + update_process_trace_metadata(StopAfterRender), + ApplyRuleRes = emqx_rule_runtime:apply_rule( + Rule, + Context, + apply_rule_environment() + ), + reset_trace_process_metadata(StopAfterRender), + ApplyRuleRes. + +update_process_trace_metadata(true = _StopAfterRender) -> + logger:update_process_metadata(#{ + stop_action_after_render => true + }); +update_process_trace_metadata(false = _StopAfterRender) -> + ok. + +reset_trace_process_metadata(true = _StopAfterRender) -> + Meta = logger:get_process_metadata(), + NewMeta = maps:remove(stop_action_after_render, Meta), + logger:set_process_metadata(NewMeta); +reset_trace_process_metadata(false = _StopAfterRender) -> + ok. + +%% At the time of writing the environment passed to the apply rule function is +%% not used at all for normal actions. When it is used for custom functions it +%% is first merged with the context so there does not seem to be any need to +%% set this to anything else then the empty map. +apply_rule_environment() -> #{}. + -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> case emqx_rule_sqlparser:parse(Sql) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 76cc23c0d..b0ca00a0e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -43,7 +43,8 @@ all() -> {group, metrics}, {group, metrics_simple}, {group, metrics_fail}, - {group, metrics_fail_simple} + {group, metrics_fail_simple}, + {group, tracing} ]. suite() -> @@ -142,6 +143,9 @@ groups() -> {metrics_fail_simple, [], [ t_rule_metrics_sync_fail, t_rule_metrics_async_fail + ]}, + {tracing, [], [ + t_trace_rule_id ]} ]. @@ -160,7 +164,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]), ok. set_special_configs(emqx_auth) -> @@ -3632,6 +3636,111 @@ create_bridge(Type, Name, Config) -> {ok, _Bridge} = emqx_bridge:create(Type, Name, Config), emqx_bridge_resource:bridge_id(Type, Name). +create_rule(Name, SQL) -> + Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL), + {ok, _} = emqx_rule_engine:create_rule(Rule). + +emqtt_client_config() -> + [ + {host, "localhost"}, + {clientid, <<"client">>}, + {username, <<"testuser">>}, + {password, <<"pass">>} + ]. + +filesync(Name, Type) -> + ct:sleep(50), + filesync(Name, Type, 5). + +%% sometime the handler process is not started yet. +filesync(Name, Type, 0) -> + ct:fail("Handler process not started ~p ~p", [Name, Type]); +filesync(Name0, Type, Retry) -> + Name = + case is_binary(Name0) of + true -> Name0; + false -> list_to_binary(Name0) + end, + try + Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>), + ok = logger_disk_log_h:filesync(Handler) + catch + E:R -> + ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]), + ct:sleep(100), + filesync(Name, Type, Retry - 1) + end. + +t_trace_rule_id(_Config) -> + %% Start MQTT Client + emqx_trace_SUITE:reload(), + {ok, T} = emqtt:start_link(emqtt_client_config()), + emqtt:connect(T), + %% Create rules + create_rule( + <<"test_rule_id_1">>, + <<"select 1 as rule_number from \"rule_1_topic\"">> + ), + create_rule( + <<"test_rule_id_2">>, + <<"select 2 as rule_number from \"rule_2_topic\"">> + ), + %% Start tracing + ok = emqx_trace_handler:install( + "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log" + ), + ok = emqx_trace_handler:install( + "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log" + ), + emqx_trace:check(), + ok = filesync("CLI-RULE-1", ruleid), + ok = filesync("CLI-RULE-2", ruleid), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/rule_trace_1.log")), + ?assert(filelib:is_regular("tmp/rule_trace_2.log")), + + %% Get current traces + ?assertMatch( + [ + #{ + type := ruleid, + filter := <<"test_rule_id_1">>, + level := debug, + dst := "tmp/rule_trace_1.log", + name := <<"CLI-RULE-1">> + }, + #{ + type := ruleid, + filter := <<"test_rule_id_2">>, + name := <<"CLI-RULE-2">>, + level := debug, + dst := "tmp/rule_trace_2.log" + } + ], + emqx_trace_handler:running() + ), + + %% Trigger rule + emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>), + ?retry( + 100, + 5, + begin + ok = filesync("CLI-RULE-1", ruleid), + {ok, Bin} = file:read_file("tmp/rule_trace_1.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>])) + end + ), + ok = filesync("CLI-RULE-2", ruleid), + ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>), + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>), + ?assertEqual([], emqx_trace_handler:running()), + emqtt:disconnect(T). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl new file mode 100644 index 000000000..c875617ce --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -0,0 +1,378 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_api_rule_apply_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_conf), + AppsToStart = [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_http, + emqx_rule_engine + ], + %% I don't know why we need to stop the apps and then start them but if we + %% don't do this and other suites run before this suite the test cases will + %% fail as it seems like the connector silently refuses to start. + ok = emqx_cth_suite:stop(AppsToStart), + Apps = emqx_cth_suite:start( + AppsToStart, + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_Case, Config) -> + emqx_bridge_http_test_lib:init_http_success_server(Config). + +end_per_testcase(_TestCase, _Config) -> + ok = emqx_bridge_http_connector_test_server:stop(), + emqx_bridge_v2_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_connectors(), + emqx_common_test_helpers:call_janitor(), + ok. + +t_basic_apply_rule_trace_ruleid(Config) -> + basic_apply_rule_test_helper(Config, ruleid, false). + +t_basic_apply_rule_trace_clientid(Config) -> + basic_apply_rule_test_helper(Config, clientid, false). + +t_basic_apply_rule_trace_ruleid_stop_after_render(Config) -> + basic_apply_rule_test_helper(Config, ruleid, true). + +basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> + HTTPServerConfig = ?config(http_server, Config), + emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig), + #{status := connected} = emqx_bridge_v2:health_check( + http, emqx_bridge_http_test_lib:bridge_name() + ), + %% Create Rule + RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), + SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_testlib:create_rule_and_action_http( + http, + RuleTopic, + Config, + #{sql => SQL} + ), + ClientId = <<"c_emqx">>, + %% =================================== + %% Create trace for RuleId + %% =================================== + TraceName = atom_to_binary(?FUNCTION_NAME), + TraceValue = + case TraceType of + ruleid -> + RuleId; + clientid -> + ClientId + end, + create_trace(TraceName, TraceType, TraceValue), + %% =================================== + Context = #{ + clientid => ClientId, + event_type => message_publish, + payload => <<"{\"msg\": \"hello\"}">>, + qos => 1, + topic => RuleTopic, + username => <<"u_emqx">> + }, + Params = #{ + <<"context">> => Context, + <<"stop_action_after_template_rendering">> => StopAfterRender + }, + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), + Now = erlang:system_time(second) - 10, + {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)), + ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG:~n~s", [Bin]), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>])) + end + ), + case StopAfterRender of + true -> + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG2:~n~s", [Bin]), + ?assertNotEqual( + nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>]) + ) + end + ); + false -> + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(TraceName, TraceType, Now), + io:format("THELOG3:~n~s", [Bin]), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])) + end + ) + end, + emqx_trace:delete(TraceName), + ok. + +create_trace(TraceName, TraceType, TraceValue) -> + Now = erlang:system_time(second) - 10, + Start = Now, + End = Now + 60, + Trace = #{ + name => TraceName, + type => TraceType, + TraceType => TraceValue, + start_at => Start, + end_at => End + }, + emqx_trace_SUITE:reload(), + ok = emqx_trace:clear(), + {ok, _} = emqx_trace:create(Trace). + +t_apply_rule_test_batch_separation_stop_after_render(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_connector_info, + hard_coded_test_connector_info_modules, + 0, + [emqx_rule_engine_test_connector_info] + ), + emqx_connector_info:clean_cache(), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect( + emqx_action_info, + hard_coded_test_action_info_modules, + 0, + [emqx_rule_engine_test_action_info] + ), + emqx_action_info:clean_cache(), + {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), + Name = atom_to_binary(?FUNCTION_NAME), + ActionConf = + #{ + <<"connector">> => Name, + <<"parameters">> => + #{ + <<"values">> => + #{ + <<"send_to_pid">> => emqx_utils:bin_to_hexstr( + term_to_binary(self()), upper + ) + } + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1000, + <<"batch_time">> => 500 + } + }, + {ok, _} = emqx_bridge_v2:create( + rule_engine_test, + ?FUNCTION_NAME, + ActionConf + ), + SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>, + {ok, RuleID} = create_rule_with_action( + rule_engine_test, + ?FUNCTION_NAME, + SQL + ), + create_trace(Name, ruleid, RuleID), + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(Name, ruleid), + Now = erlang:system_time(second) - 10, + %% Stop + ParmsStopAfterRender = apply_rule_parms(true, Name), + ParmsNoStopAfterRender = apply_rule_parms(false, Name), + %% Check that batching is working + Count = 200, + CountMsgFun = + fun + CountMsgFunRec(0 = _CurCount, GotBatchWithAtLeastTwo) -> + GotBatchWithAtLeastTwo; + CountMsgFunRec(CurCount, GotBatchWithAtLeastTwo) -> + receive + List -> + Len = length(List), + CountMsgFunRec(CurCount - Len, GotBatchWithAtLeastTwo orelse (Len > 1)) + end + end, + lists:foreach( + fun(_) -> + {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender) + end, + lists:seq(1, Count) + ), + %% We should get the messages and at least one batch with more than 1 + true = CountMsgFun(Count, false), + %% We should check that we don't get any mixed batch + CheckBatchesFun = + fun + CheckBatchesFunRec(0 = _CurCount) -> + ok; + CheckBatchesFunRec(CurCount) -> + receive + [{_, #{<<"stop_after_render">> := StopValue}} | _] = List -> + [ + ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg) + || {_, Msg} <- List + ], + Len = length(List), + CheckBatchesFunRec(CurCount - Len) + end + end, + lists:foreach( + fun(_) -> + case rand:normal() < 0 of + true -> + {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender); + false -> + {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender) + end + end, + lists:seq(1, Count) + ), + CheckBatchesFun(Count), + %% Just check that the log file is created as expected + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(Name, ruleid, Now), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])), + ?assertNotEqual( + nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>]) + ) + end + ), + %% Cleanup + ok = emqx_trace:delete(Name), + ok = emqx_rule_engine:delete_rule(RuleID), + ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME), + ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME), + [_, _] = meck:unload(), + ok. + +apply_rule_parms(StopAfterRender, Name) -> + Payload = #{<<"is_stop_after_render">> => StopAfterRender}, + Context = #{ + clientid => Name, + event_type => message_publish, + payload => emqx_utils_json:encode(Payload), + qos => 1, + topic => Name, + username => <<"u_emqx">> + }, + #{ + <<"context">> => Context, + <<"stop_action_after_template_rendering">> => StopAfterRender + }. + +create_rule_with_action(ActionType, ActionName, SQL) -> + BridgeId = emqx_bridge_resource:bridge_id(ActionType, ActionName), + Params = #{ + enable => true, + sql => SQL, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("rule action params: ~p", [Params]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res0} -> + #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + {ok, RuleId}; + Error -> + Error + end. + +%% Helper Functions + +call_apply_rule_api(RuleId, Params) -> + Method = post, + Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]), + Res = request(Method, Path, Params), + Res. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +read_rule_trace_file(TraceName, TraceType, From) -> + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), + {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)), + Bin. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl new file mode 100644 index 000000000..91bbcb442 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl @@ -0,0 +1,101 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_test_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(CONNECTOR_TYPE, rule_engine_test). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + +bridge_v1_type_name() -> ?ACTION_TYPE. + +action_type_name() -> ?ACTION_TYPE. + +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> emqx_rule_engine_test_action_info. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "bridge_test_action_info". + +roots() -> []. + +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rule_engine_test_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, rule_engine_test_action)), + #{ + desc => <<"Test Action Config">>, + required => false + } + )}; +fields(rule_engine_test_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => undefined + } + ) + ); +fields(action_parameters) -> + [ + {values, + hoconsc:mk( + typerefl:map(), + #{desc => undefined, default => #{}} + )} + ]; +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("config") -> + emqx_resource_schema:fields("resource_opts") ++ + fields(connector_fields); +fields(connector_fields) -> + [ + {values, + hoconsc:mk( + typerefl:map(), + #{desc => undefined, default => #{}} + )} + ]. +desc(_) -> + undefined. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl new file mode 100644 index 000000000..c22c5fbd5 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -0,0 +1,100 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_rule_engine_test_connector). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +%% =================================================================== +callback_mode() -> always_sync. + +on_start( + _InstId, + _Config +) -> + {ok, #{installed_channels => #{}}}. + +on_stop(_InstId, _State) -> + ok. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + NewInstalledChannels = maps:put(ChannelId, ChannelConfig, InstalledChannels), + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_remove_channel( + _InstId, + OldState, + _ChannelId +) -> + {ok, OldState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +on_query( + _InstId, + _Query, + _State +) -> + ok. + +on_batch_query( + _InstId, + [{ChannelId, _Req} | _] = Msg, + #{installed_channels := Channels} = _State +) -> + #{parameters := #{values := #{send_to_pid := PidBin}}} = maps:get(ChannelId, Channels), + Pid = binary_to_term(emqx_utils:hexstr_to_bin(PidBin)), + Pid ! Msg, + emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}), + ok. + +on_get_status(_InstId, _State) -> + connected. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl new file mode 100644 index 000000000..1c300bff8 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_test_connector_info). + +-behaviour(emqx_connector_info). + +-export([ + type_name/0, + bridge_types/0, + resource_callback_module/0, + config_schema/0, + schema_module/0, + api_schema/1 +]). + +type_name() -> + rule_engine_test. + +bridge_types() -> + [rule_engine_test]. + +resource_callback_module() -> + emqx_rule_engine_test_connector. + +config_schema() -> + {rule_engine_test, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_rule_engine_test_action_info, "config_connector")), + #{ + desc => <<"Test Connector Config">>, + required => false + } + )}. + +schema_module() -> + emqx_rule_engine_test_action_info. + +api_schema(Method) -> + emqx_connector_schema:api_ref( + ?MODULE, <<"rule_engine_test">>, Method ++ "_connector" + ). diff --git a/changes/ce/feat-12827.en.md b/changes/ce/feat-12827.en.md new file mode 100644 index 000000000..633a33d6b --- /dev/null +++ b/changes/ce/feat-12827.en.md @@ -0,0 +1 @@ +It is now possible to trace rules with a new Rule ID trace filter as well as with the Client ID filter. For testing purposes it is now also possible to use a new HTTP API endpoint (rules/:id/test) to artificially apply a rule and optionally stop its actions after they have been rendered. diff --git a/mix.exs b/mix.exs index 46aa00280..726fc13b0 100644 --- a/mix.exs +++ b/mix.exs @@ -60,7 +60,7 @@ defmodule EMQXUmbrella.MixProject do {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}, - {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, + {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer {:emqtt, diff --git a/rebar.config b/rebar.config index ee28b32c7..14608ec4c 100644 --- a/rebar.config +++ b/rebar.config @@ -88,7 +88,7 @@ {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}}, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}, - {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}, + {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}}, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}}, diff --git a/rel/i18n/emqx_mgmt_api_trace.hocon b/rel/i18n/emqx_mgmt_api_trace.hocon index 67462ab43..ba07d7d53 100644 --- a/rel/i18n/emqx_mgmt_api_trace.hocon +++ b/rel/i18n/emqx_mgmt_api_trace.hocon @@ -80,6 +80,11 @@ client_ip_addess.desc: client_ip_addess.label: """Client IP Address""" +ruleid.desc: +"""Specify the Rule ID if the trace type is 'ruleid'.""" +ruleid.label: +"""Rule ID""" + trace_status.desc: """trace status""" trace_status.label: diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index 0289f53ab..68c6a560d 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -66,6 +66,12 @@ test_context.desc: test_context.label: """Event Conetxt""" +stop_action_after_template_render.desc: +"""Set this to true if the action should be stopped after its template has been rendered (default is true).""" + +stop_action_after_template_render.label: +"""Stop Action After Template Rendering""" + node_node.desc: """The node name""" diff --git a/rel/i18n/emqx_rule_engine_api.hocon b/rel/i18n/emqx_rule_engine_api.hocon index 385b71ddc..0745a108d 100644 --- a/rel/i18n/emqx_rule_engine_api.hocon +++ b/rel/i18n/emqx_rule_engine_api.hocon @@ -90,4 +90,10 @@ api9.desc: api9.label: """Get configuration""" +api11.desc: +"""Apply a rule with the given message and environment""" + +api11.label: +"""Apply Rule""" + }