diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index fc18ed4f0..460224c37 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1202,5 +1202,5 @@ convert_timestamp(MillisecondsTimestamp) -> MicroSecs = MicroTimestamp rem 1000_000, {MegaSecs, Secs, MicroSecs}. -uuid_str(UUID, DisplyOpt) -> - uuid:uuid_to_string(UUID, DisplyOpt). +uuid_str(UUID, DisplayOpt) -> + uuid:uuid_to_string(UUID, DisplayOpt). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 4a1477a1c..f83aa2920 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -36,7 +36,7 @@ ] ). --compile({no_auto_import, [alias/1]}). +-compile({no_auto_import, [alias/2]}). -type columns() :: map(). -type alias() :: atom(). @@ -204,7 +204,7 @@ select_and_transform([{as, Field, Alias} | More], Columns, Action) -> ); select_and_transform([Field | More], Columns, Action) -> Val = eval(Field, Columns), - Key = alias(Field), + Key = alias(Field, Columns), select_and_transform( More, nested_put(Key, Val, Columns), @@ -228,11 +228,11 @@ select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) -> ); select_and_collect([Field], Columns, {Action, _}) -> Val = eval(Field, Columns), - Key = alias(Field), + Key = alias(Field, Columns), {nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; select_and_collect([Field | More], Columns, {Action, LastKV}) -> Val = eval(Field, Columns), - Key = alias(Field), + Key = alias(Field, Columns), select_and_collect( More, nested_put(Key, Val, Columns), @@ -401,38 +401,36 @@ eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) -> eval({'fun', {_, Name}, Args}, Columns) -> apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns). -handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Columns) -> - Columns#{payload => may_decode_payload(Payload)}; -handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns) -> - Columns#{<<"payload">> => may_decode_payload(Payload)}; -handle_alias(_, Columns) -> - Columns. - -alias({var, Var}) -> +alias({var, Var}, _Columns) -> {var, Var}; -alias({const, Val}) when is_binary(Val) -> +alias({const, Val}, _Columns) when is_binary(Val) -> {var, Val}; -alias({list, L}) -> +alias({list, L}, _Columns) -> {var, ?ephemeral_alias(list, length(L))}; -alias({range, R}) -> +alias({range, R}, _Columns) -> {var, ?ephemeral_alias(range, R)}; -alias({get_range, _, {var, Key}}) -> +alias({get_range, _, {var, Key}}, _Columns) -> {var, Key}; -alias({get_range, _, {path, Path}}) -> - {path, Path}; -alias({path, Path}) -> - {path, Path}; -alias({const, Val}) -> +alias({get_range, _, {path, _Path} = Path}, Columns) -> + handle_path_alias(Path, Columns); +alias({path, _Path} = Path, Columns) -> + handle_path_alias(Path, Columns); +alias({const, Val}, _Columns) -> {var, ?ephemeral_alias(const, Val)}; -alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) -> +alias({Op, _L, _R}, _Columns) when ?is_arith(Op); ?is_comp(Op) -> {var, ?ephemeral_alias(op, Op)}; -alias({'case', On, _, _}) -> +alias({'case', On, _, _}, _Columns) -> {var, ?ephemeral_alias('case', On)}; -alias({'fun', Name, _}) -> +alias({'fun', Name, _}, _Columns) -> {var, ?ephemeral_alias('fun', Name)}; -alias(_) -> +alias(_, _Columns) -> ?ephemeral_alias(unknown, unknown). +handle_path_alias({path, [{key, <<"payload">>} | Rest]}, #{payload := _Payload} = _Columns) -> + {path, [{key, payload} | Rest]}; +handle_path_alias(Path, _Columns) -> + Path. + eval_case_clauses([], ElseClauses, Columns) -> case ElseClauses of {} -> undefined; @@ -515,8 +513,7 @@ safe_decode_and_cache(MaybeJson) -> ensure_list(List) when is_list(List) -> List; ensure_list(_NotList) -> []. -nested_put(Alias, Val, Columns0) -> - Columns = handle_alias(Alias, Columns0), +nested_put(Alias, Val, Columns) -> emqx_rule_maps:nested_put(Alias, Val, Columns). inc_action_metrics(RuleId, Result) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index f05159e30..822fac067 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1580,6 +1580,73 @@ t_sqlparse_foreach_1(_Config) -> } } ), + + Sql5 = + "foreach payload.sensors " + "from \"t/#\" ", + {ok, [ + #{payload := #{<<"sensors">> := _}}, + #{payload := #{<<"sensors">> := _}} + ]} = + emqx_rule_sqltester:test( + #{ + sql => Sql5, + context => #{ + payload => <<"{\"sensors\": [1, 2]}">>, + topic => <<"t/a">> + } + } + ), + + try + meck:new(emqx_rule_runtime, [non_strict, passthrough]), + meck:expect( + emqx_rule_runtime, + apply_rule, + fun(Rule, #{payload := Payload} = Columns, Env) -> + Columns2 = maps:put(<<"payload">>, Payload, maps:without([payload], Columns)), + meck:passthrough([Rule, Columns2, Env]) + end + ), + + Sql6 = + "foreach payload.sensors " + "from \"t/#\" ", + {ok, [ + #{<<"payload">> := #{<<"sensors">> := _}}, + #{<<"payload">> := #{<<"sensors">> := _}} + ]} = + emqx_rule_sqltester:test( + #{ + sql => Sql6, + context => #{ + <<"payload">> => <<"{\"sensors\": [1, 2]}">>, + topic => <<"t/a">> + } + } + ), + + Sql7 = + "foreach payload.sensors " + "from \"t/#\" ", + ?assertNotMatch( + {ok, [ + #{<<"payload">> := _, payload := _}, + #{<<"payload">> := _, payload := _} + ]}, + emqx_rule_sqltester:test( + #{ + sql => Sql7, + context => #{ + <<"payload">> => <<"{\"sensors\": [1, 2]}">>, + topic => <<"t/a">> + } + } + ) + ) + after + meck:unload(emqx_rule_runtime) + end, ?assert(is_binary(TRuleId)). t_sqlparse_foreach_2(_Config) -> @@ -2168,7 +2235,7 @@ t_sqlparse_array_index_1(_Config) -> " payload.x[2] " "from \"t/#\" ", ?assertMatch( - {ok, #{<<"payload">> := #{<<"x">> := [3]}}}, + {ok, #{payload := #{<<"x">> := [3]}}}, emqx_rule_sqltester:test( #{ sql => Sql2, @@ -2185,7 +2252,7 @@ t_sqlparse_array_index_1(_Config) -> " payload.x[2].y " "from \"t/#\" ", ?assertMatch( - {ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 3}]}}}, + {ok, #{payload := #{<<"x">> := [#{<<"y">> := 3}]}}}, emqx_rule_sqltester:test( #{ sql => Sql3, @@ -2373,7 +2440,7 @@ t_sqlparse_array_index_4(_Config) -> "0 as payload.x[2].y " "from \"t/#\" ", ?assertMatch( - {ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}}, + {ok, #{payload := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}}, emqx_rule_sqltester:test( #{ sql => Sql1, @@ -2548,7 +2615,7 @@ t_sqlparse_array_range_2(_Config) -> " payload.a[1..4] " "from \"t/#\" ", ?assertMatch( - {ok, #{<<"payload">> := #{<<"a">> := [0, 1, 2, 3]}}}, + {ok, #{payload := #{<<"a">> := [0, 1, 2, 3]}}}, emqx_rule_sqltester:test( #{ sql => Sql02, diff --git a/changes/ce/fix-11172.en.md b/changes/ce/fix-11172.en.md new file mode 100644 index 000000000..5e8effabb --- /dev/null +++ b/changes/ce/fix-11172.en.md @@ -0,0 +1,11 @@ +Fix the `payload` will be duplicated in the below situations: +- Use a `foreach` sentence without the `as` sub-expression and select all fields(use the `*` or omitted the `do` sub-expression) + + For example: + + `FOREACH payload.sensors FROM "t/#"` +- Select the `payload` field and all fields + + For example: + + `SELECT payload.sensors, * FROM "t/#"`