diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 66600cebc..4e850b2cc 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -46,8 +46,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.0"}}}, - {emqx_rule_engine, {path, "../emqx_rule_engine"}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.0"}}} ]}, {extra_src_dirs, [ {"test", [recursive]}, @@ -59,8 +58,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}}, - {emqx_rule_engine, {path, "../emqx_rule_engine"}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.9.7"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index 85a9c056b..59a472f3e 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). --include_lib("snabbkaffe/include/test_macros.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CLIENT, [ @@ -30,12 +29,11 @@ {password, <<"pass">>} ]). -all() -> - [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8, t_trace_rule_id]. +all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( - [emqx, emqx_rule_engine], + [emqx], #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{apps, Apps} | Config]. @@ -207,79 +205,6 @@ t_trace_topic(_Config) -> ?assertEqual([], emqx_trace_handler:running()), emqtt:disconnect(T). -create_rule(Name, SQL) -> - Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL), - {ok, _} = emqx_rule_engine:create_rule(Rule). - -t_trace_rule_id(_Config) -> - %% Start MQTT Client - {ok, T} = emqtt:start_link(?CLIENT), - emqtt:connect(T), - %% Create rules - create_rule( - <<"test_rule_id_1">>, - <<"select 1 as rule_number from \"rule_1_topic\"">> - ), - create_rule( - <<"test_rule_id_2">>, - <<"select 2 as rule_number from \"rule_2_topic\"">> - ), - %% Start tracing - ok = emqx_trace_handler:install( - "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log" - ), - ok = emqx_trace_handler:install( - "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log" - ), - emqx_trace:check(), - ok = filesync("CLI-RULE-1", ruleid), - ok = filesync("CLI-RULE-2", ruleid), - - %% Verify the tracing file exits - ?assert(filelib:is_regular("tmp/rule_trace_1.log")), - ?assert(filelib:is_regular("tmp/rule_trace_2.log")), - - %% Get current traces - ?assertMatch( - [ - #{ - type := ruleid, - filter := <<"test_rule_id_1">>, - level := debug, - dst := "tmp/rule_trace_1.log", - name := <<"CLI-RULE-1">> - }, - #{ - type := ruleid, - filter := <<"test_rule_id_2">>, - name := <<"CLI-RULE-2">>, - level := debug, - dst := "tmp/rule_trace_2.log" - } - ], - emqx_trace_handler:running() - ), - - %% Trigger rule - emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>), - ?retry( - 100, - 5, - begin - ok = filesync("CLI-RULE-1", ruleid), - {ok, Bin} = file:read_file("tmp/rule_trace_1.log"), - ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>])) - end - ), - ok = filesync("CLI-RULE-2", ruleid), - ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0), - - %% Stop tracing - ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>), - ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>), - ?assertEqual([], emqx_trace_handler:running()), - emqtt:disconnect(T). - t_trace_ip_address(_Config) -> {ok, T} = emqtt:start_link(?CLIENT), emqtt:connect(T), @@ -347,11 +272,11 @@ t_trace_ip_address(_Config) -> filesync(Name, Type) -> ct:sleep(50), - filesync(Name, Type, 5). + filesync(Name, Type, 3). %% sometime the handler process is not started yet. -filesync(Name, Type, 0) -> - ct:fail("Handler process not started ~p ~p", [Name, Type]); +filesync(_Name, _Type, 0) -> + ok; filesync(Name0, Type, Retry) -> Name = case is_binary(Name0) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 76cc23c0d..5040d15b3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -43,7 +43,8 @@ all() -> {group, metrics}, {group, metrics_simple}, {group, metrics_fail}, - {group, metrics_fail_simple} + {group, metrics_fail_simple}, + {group, tracing} ]. suite() -> @@ -142,6 +143,9 @@ groups() -> {metrics_fail_simple, [], [ t_rule_metrics_sync_fail, t_rule_metrics_async_fail + ]}, + {tracing, [], [ + t_trace_rule_id ]} ]. @@ -154,13 +158,13 @@ init_per_suite(Config) -> emqx_rule_funcs_demo:module_info(), application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge], + [emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge], fun set_special_configs/1 ), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), + emqx_common_test_helpers:stop_apps([emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]), ok. set_special_configs(emqx_auth) -> @@ -3632,6 +3636,111 @@ create_bridge(Type, Name, Config) -> {ok, _Bridge} = emqx_bridge:create(Type, Name, Config), emqx_bridge_resource:bridge_id(Type, Name). +create_rule(Name, SQL) -> + Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL), + {ok, _} = emqx_rule_engine:create_rule(Rule). + +emqtt_client_config() -> + [ + {host, "localhost"}, + {clientid, <<"client">>}, + {username, <<"testuser">>}, + {password, <<"pass">>} + ]. + +filesync(Name, Type) -> + ct:sleep(50), + filesync(Name, Type, 5). + +%% sometime the handler process is not started yet. +filesync(Name, Type, 0) -> + ct:fail("Handler process not started ~p ~p", [Name, Type]); +filesync(Name0, Type, Retry) -> + Name = + case is_binary(Name0) of + true -> Name0; + false -> list_to_binary(Name0) + end, + try + Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>), + ok = logger_disk_log_h:filesync(Handler) + catch + E:R -> + ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]), + ct:sleep(100), + filesync(Name, Type, Retry - 1) + end. + +t_trace_rule_id(_Config) -> + %% Start MQTT Client + emqx_trace_SUITE:reload(), + {ok, T} = emqtt:start_link(emqtt_client_config()), + emqtt:connect(T), + %% Create rules + create_rule( + <<"test_rule_id_1">>, + <<"select 1 as rule_number from \"rule_1_topic\"">> + ), + create_rule( + <<"test_rule_id_2">>, + <<"select 2 as rule_number from \"rule_2_topic\"">> + ), + %% Start tracing + ok = emqx_trace_handler:install( + "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log" + ), + ok = emqx_trace_handler:install( + "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log" + ), + emqx_trace:check(), + ok = filesync("CLI-RULE-1", ruleid), + ok = filesync("CLI-RULE-2", ruleid), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/rule_trace_1.log")), + ?assert(filelib:is_regular("tmp/rule_trace_2.log")), + + %% Get current traces + ?assertMatch( + [ + #{ + type := ruleid, + filter := <<"test_rule_id_1">>, + level := debug, + dst := "tmp/rule_trace_1.log", + name := <<"CLI-RULE-1">> + }, + #{ + type := ruleid, + filter := <<"test_rule_id_2">>, + name := <<"CLI-RULE-2">>, + level := debug, + dst := "tmp/rule_trace_2.log" + } + ], + emqx_trace_handler:running() + ), + + %% Trigger rule + emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>), + ?retry( + 100, + 5, + begin + ok = filesync("CLI-RULE-1", ruleid), + {ok, Bin} = file:read_file("tmp/rule_trace_1.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>])) + end + ), + ok = filesync("CLI-RULE-2", ruleid), + ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>), + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>), + ?assertEqual([], emqx_trace_handler:running()), + emqtt:disconnect(T). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------