diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 43de01593..454c8ac6e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -45,7 +45,7 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_metrics]}, - {ok, {{one_for_all, 10, 100}, [Registry, Metrics]}}. + {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. start_locker() -> Locker = #{id => emqx_rule_locker, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 2ad383475..0e0f01c5d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -360,6 +360,12 @@ printable_maps(Headers) -> fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{'User-Property' => maps:from_list(V0)}; + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 7b40a12fc..ef6bdea12 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -234,20 +234,26 @@ take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = A = emqx_rule_registry:get_action_instance_params(Id), emqx_rule_metrics:inc_actions_taken(Id), apply_action_func(Selected, Envs, Apply, ActName) + of + {badact, Reason} -> + handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason); + Result -> Result catch error:{badfun, _Func}:_ST -> %?LOG(warning, "Action ~p maybe outdated, refresh it and try again." % "Func: ~p~nST:~0p", [Id, Func, ST]), trans_action_on(Id, fun() -> - emqx_rule_engine:refresh_actions([ActInst]) + emqx_rule_engine:refresh_actions([ActInst]) end, 5000), emqx_rule_metrics:inc_actions_retry(Id), take_action(ActInst, Selected, Envs, OnFailed, RetryN-1); Error:Reason:Stack -> + emqx_rule_metrics:inc_actions_exception(Id), handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack}) end; take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) -> + emqx_rule_metrics:inc_actions_error(Id), handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}). apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) -> @@ -284,12 +290,10 @@ wait_action_on(Id, RetryN) -> end. handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) -> - emqx_rule_metrics:inc_actions_exception(Id), ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]), take_actions(Fallbacks, Selected, Envs, continue), failed; handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) -> - emqx_rule_metrics:inc_actions_exception(Id), ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]), take_actions(Fallbacks, Selected, Envs, continue), error({take_action_failed, {Id, Reason}}). @@ -409,11 +413,13 @@ add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) -> %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ -may_decode_payload(Payload) -> +may_decode_payload(Payload) when is_binary(Payload) -> case get_cached_payload() of - undefined -> ensure_decoded(Payload); + undefined -> safe_decode_and_cache(Payload); DecodedP -> DecodedP - end. + end; +may_decode_payload(Payload) -> + Payload. get_cached_payload() -> erlang:get(rule_payload). @@ -422,9 +428,7 @@ cache_payload(DecodedP) -> erlang:put(rule_payload, DecodedP), DecodedP. -ensure_decoded(Json) when is_map(Json); is_list(Json) -> - Json; -ensure_decoded(MaybeJson) -> +safe_decode_and_cache(MaybeJson) -> try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) catch _:_ -> #{} end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index e097e578b..8486ea16c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -37,6 +37,7 @@ all() -> , {group, runtime} , {group, events} , {group, multi_actions} + , {group, bugs} ]. suite() -> @@ -123,11 +124,15 @@ groups() -> {events, [], [t_events ]}, + {bugs, [], + [t_sqlparse_payload_as + ]}, {multi_actions, [], [t_sqlselect_multi_actoins_1, t_sqlselect_multi_actoins_1_1, t_sqlselect_multi_actoins_2, t_sqlselect_multi_actoins_3, + t_sqlselect_multi_actoins_3_1, t_sqlselect_multi_actoins_4 ]} ]. @@ -200,6 +205,7 @@ init_per_testcase(Test, Config) ;Test =:= t_sqlselect_multi_actoins_1_1 ;Test =:= t_sqlselect_multi_actoins_2 ;Test =:= t_sqlselect_multi_actoins_3 + ;Test =:= t_sqlselect_multi_actoins_3_1 ;Test =:= t_sqlselect_multi_actoins_4 -> ok = emqx_rule_engine:load_providers(), @@ -209,6 +215,12 @@ init_per_testcase(Test, Config) types=[], params_spec = #{}, title = #{en => <<"Crash Action">>}, description = #{en => <<"This action will always fail!">>}}), + ok = emqx_rule_registry:add_action( + #action{name = 'failure_action', app = ?APP, + module = ?MODULE, on_create = failure_action, + types=[], params_spec = #{}, + title = #{en => <<"Crash Action">>}, + description = #{en => <<"This action will always fail!">>}}), ok = emqx_rule_registry:add_action( #action{name = 'plus_by_one', app = ?APP, module = ?MODULE, on_create = plus_by_one_action, @@ -1288,6 +1300,44 @@ t_sqlselect_multi_actoins_3(Config) -> emqx_rule_registry:remove_rule(Rule). +t_sqlselect_multi_actoins_3_1(Config) -> + %% We create 2 actions in the same rule (on_action_failed = continue): + %% The first will fail (with a 'badact' return) and we need to make sure the + %% fallback actions can be executed, and the next actoins + %% will be run without influence + {ok, Rule} = emqx_rule_engine:create_rule( + #{rawsql => ?config(connsql, Config), + on_action_failed => continue, + actions => [ + #{name => 'failure_action', args => #{}, fallbacks =>[ + #{name => 'plus_by_one', args => #{}, fallbacks =>[]}, + #{name => 'plus_by_one', args => #{}, fallbacks =>[]} + ]}, + #{name => 'republish', + args => #{<<"target_topic">> => <<"t2">>, + <<"target_qos">> => -1, + <<"payload_tmpl">> => <<"clientid=${clientid}">> + }, + fallbacks => []} + ] + }), + + (?config(conn_event, Config))(), + timer:sleep(100), + + %% verfiy the fallback actions has been run + ?assertEqual(2, ets:lookup_element(plus_by_one_action, num, 2)), + + %% verfiy the next actions can be run + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=c_emqx1">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + + emqx_rule_registry:remove_rule(Rule). + t_sqlselect_multi_actoins_4(Config) -> %% We create 2 actions in the same rule (on_action_failed = continue): %% The first will fail and we need to make sure the @@ -1920,6 +1970,44 @@ t_sqlparse_new_map(_Config) -> <<"c">> := [#{}] }, Res00). +t_sqlparse_payload_as(_Config) -> + %% https://github.com/emqx/emqx/issues/3866 + Sql00 = "SELECT " + " payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, " + " map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem " + "FROM \"t/#\" ", + Payload1 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>, + {ok, Res01} = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql00, + <<"ctx">> => #{<<"payload">> => Payload1, + <<"topic">> => <<"t/a">>}}), + ?assertMatch(#{ + <<"payload">> := #{ + <<"params">> := #{ + <<"convertTemp">> := 20, + <<"engineSpeed">> := 42, + <<"engineWorkTime">> := -1, + <<"hydOilTem">> := 30 + } + } + }, Res01), + + Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>, + {ok, Res02} = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql00, + <<"ctx">> => #{<<"payload">> => Payload2, + <<"topic">> => <<"t/a">>}}), + ?assertMatch(#{ + <<"payload">> := #{ + <<"params">> := #{ + <<"convertTemp">> := 20, + <<"engineSpeed">> := 42, + <<"engineWorkTime">> := -1, + <<"hydOilTem">> := -1 + } + } + }, Res02). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ @@ -2006,6 +2094,12 @@ mfa_action(Id, _Params) -> mfa_action_do(_Data, _Envs, K) -> persistent_term:put(K, 1). +failure_action(_Id, _Params) -> + fun(Data, _Envs) -> + ct:pal("applying crash action, Data: ~p", [Data]), + {badact, intentional_failure} + end. + crash_action(_Id, _Params) -> fun(Data, _Envs) -> ct:pal("applying crash action, Data: ~p", [Data]),