diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index d62803d7e..f047e2047 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -378,9 +378,9 @@ eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op end end; eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> - nested_get({path, Path}, may_decode_payload(Payload)); + nested_get({path, Path}, maybe_decode_payload(Payload)); eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> - nested_get({path, Path}, may_decode_payload(Payload)); + nested_get({path, Path}, maybe_decode_payload(Payload)); eval({path, _} = Path, Columns) -> nested_get(Path, Columns); eval({range, {Begin, End}}, _Columns) -> @@ -410,6 +410,16 @@ eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) -> eval({'fun', {_, Name}, Args}, Columns) -> apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns). +%% the payload maybe is JSON data, decode it to a `map` first for nested put +ensure_decoded_payload({path, [{key, payload} | _]}, #{payload := Payload} = Columns) -> + Columns#{payload => maybe_decode_payload(Payload)}; +ensure_decoded_payload( + {path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns +) -> + Columns#{<<"payload">> => maybe_decode_payload(Payload)}; +ensure_decoded_payload(_, Columns) -> + Columns. + alias({var, Var}, _Columns) -> {var, Var}; alias({const, Val}, _Columns) when is_binary(Val) -> @@ -497,12 +507,12 @@ add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) -> %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ -may_decode_payload(Payload) when is_binary(Payload) -> +maybe_decode_payload(Payload) when is_binary(Payload) -> case get_cached_payload() of undefined -> safe_decode_and_cache(Payload); DecodedP -> DecodedP end; -may_decode_payload(Payload) -> +maybe_decode_payload(Payload) -> Payload. get_cached_payload() -> @@ -522,7 +532,8 @@ safe_decode_and_cache(MaybeJson) -> ensure_list(List) when is_list(List) -> List; ensure_list(_NotList) -> []. -nested_put(Alias, Val, Columns) -> +nested_put(Alias, Val, Columns0) -> + Columns = ensure_decoded_payload(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). inc_action_metrics(RuleId, Result) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c8bebab99..8c3bd0ebb 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -104,7 +104,8 @@ groups() -> t_sqlparse_true_false, t_sqlparse_undefined_variable, t_sqlparse_new_map, - t_sqlparse_invalid_json + t_sqlparse_invalid_json, + t_sqlselect_as_put ]}, {events, [], [ t_events, @@ -1587,6 +1588,45 @@ t_sqlselect_message_publish_event_keep_original_props_2(_Config) -> emqtt:stop(Client1), delete_rule(TopicRule). +t_sqlselect_as_put(_Config) -> + %% Verify SELECT with 'AS' to update the payload + Sql = + "select payload, " + "'STEVE' as payload.data[1].name " + "from \"t/#\" ", + PayloadMap = #{ + <<"f1">> => <<"f1">>, + <<"f2">> => <<"f2">>, + <<"data">> => [ + #{<<"name">> => <<"n1">>, <<"idx">> => 1}, + #{<<"name">> => <<"n2">>, <<"idx">> => 2} + ] + }, + PayloadBin = emqx_utils_json:encode(PayloadMap), + SqlResult = emqx_rule_sqltester:test( + #{ + sql => Sql, + context => + #{ + payload => PayloadBin, + topic => <<"t/a">> + } + } + ), + ?assertMatch({ok, #{<<"payload">> := _}}, SqlResult), + {ok, #{<<"payload">> := PayloadMap2}} = SqlResult, + ?assertMatch( + #{ + <<"f1">> := <<"f1">>, + <<"f2">> := <<"f2">>, + <<"data">> := [ + #{<<"name">> := <<"STEVE">>, <<"idx">> := 1}, + #{<<"name">> := <<"n2">>, <<"idx">> := 2} + ] + }, + PayloadMap2 + ). + t_sqlparse_event_1(_Config) -> Sql = "select topic as tp "