Merge pull request #11331 from lafirest/fix/nested_put
fix(nested_put): fix a data loss bug introduced by #11172
This commit is contained in:
commit
a35df30b28
|
@ -378,9 +378,9 @@ eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, maybe_decode_payload(Payload));
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, maybe_decode_payload(Payload));
|
||||||
eval({path, _} = Path, Columns) ->
|
eval({path, _} = Path, Columns) ->
|
||||||
nested_get(Path, Columns);
|
nested_get(Path, Columns);
|
||||||
eval({range, {Begin, End}}, _Columns) ->
|
eval({range, {Begin, End}}, _Columns) ->
|
||||||
|
@ -410,6 +410,16 @@ eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
|
||||||
eval({'fun', {_, Name}, Args}, Columns) ->
|
eval({'fun', {_, Name}, Args}, Columns) ->
|
||||||
apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
|
apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
|
||||||
|
|
||||||
|
%% the payload maybe is JSON data, decode it to a `map` first for nested put
|
||||||
|
ensure_decoded_payload({path, [{key, payload} | _]}, #{payload := Payload} = Columns) ->
|
||||||
|
Columns#{payload => maybe_decode_payload(Payload)};
|
||||||
|
ensure_decoded_payload(
|
||||||
|
{path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns
|
||||||
|
) ->
|
||||||
|
Columns#{<<"payload">> => maybe_decode_payload(Payload)};
|
||||||
|
ensure_decoded_payload(_, Columns) ->
|
||||||
|
Columns.
|
||||||
|
|
||||||
alias({var, Var}, _Columns) ->
|
alias({var, Var}, _Columns) ->
|
||||||
{var, Var};
|
{var, Var};
|
||||||
alias({const, Val}, _Columns) when is_binary(Val) ->
|
alias({const, Val}, _Columns) when is_binary(Val) ->
|
||||||
|
@ -497,12 +507,12 @@ add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
may_decode_payload(Payload) when is_binary(Payload) ->
|
maybe_decode_payload(Payload) when is_binary(Payload) ->
|
||||||
case get_cached_payload() of
|
case get_cached_payload() of
|
||||||
undefined -> safe_decode_and_cache(Payload);
|
undefined -> safe_decode_and_cache(Payload);
|
||||||
DecodedP -> DecodedP
|
DecodedP -> DecodedP
|
||||||
end;
|
end;
|
||||||
may_decode_payload(Payload) ->
|
maybe_decode_payload(Payload) ->
|
||||||
Payload.
|
Payload.
|
||||||
|
|
||||||
get_cached_payload() ->
|
get_cached_payload() ->
|
||||||
|
@ -522,7 +532,8 @@ safe_decode_and_cache(MaybeJson) ->
|
||||||
ensure_list(List) when is_list(List) -> List;
|
ensure_list(List) when is_list(List) -> List;
|
||||||
ensure_list(_NotList) -> [].
|
ensure_list(_NotList) -> [].
|
||||||
|
|
||||||
nested_put(Alias, Val, Columns) ->
|
nested_put(Alias, Val, Columns0) ->
|
||||||
|
Columns = ensure_decoded_payload(Alias, Columns0),
|
||||||
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||||
|
|
||||||
inc_action_metrics(RuleId, Result) ->
|
inc_action_metrics(RuleId, Result) ->
|
||||||
|
|
|
@ -104,7 +104,8 @@ groups() ->
|
||||||
t_sqlparse_true_false,
|
t_sqlparse_true_false,
|
||||||
t_sqlparse_undefined_variable,
|
t_sqlparse_undefined_variable,
|
||||||
t_sqlparse_new_map,
|
t_sqlparse_new_map,
|
||||||
t_sqlparse_invalid_json
|
t_sqlparse_invalid_json,
|
||||||
|
t_sqlselect_as_put
|
||||||
]},
|
]},
|
||||||
{events, [], [
|
{events, [], [
|
||||||
t_events,
|
t_events,
|
||||||
|
@ -1587,6 +1588,45 @@ t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
|
||||||
emqtt:stop(Client1),
|
emqtt:stop(Client1),
|
||||||
delete_rule(TopicRule).
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
|
t_sqlselect_as_put(_Config) ->
|
||||||
|
%% Verify SELECT with 'AS' to update the payload
|
||||||
|
Sql =
|
||||||
|
"select payload, "
|
||||||
|
"'STEVE' as payload.data[1].name "
|
||||||
|
"from \"t/#\" ",
|
||||||
|
PayloadMap = #{
|
||||||
|
<<"f1">> => <<"f1">>,
|
||||||
|
<<"f2">> => <<"f2">>,
|
||||||
|
<<"data">> => [
|
||||||
|
#{<<"name">> => <<"n1">>, <<"idx">> => 1},
|
||||||
|
#{<<"name">> => <<"n2">>, <<"idx">> => 2}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
PayloadBin = emqx_utils_json:encode(PayloadMap),
|
||||||
|
SqlResult = emqx_rule_sqltester:test(
|
||||||
|
#{
|
||||||
|
sql => Sql,
|
||||||
|
context =>
|
||||||
|
#{
|
||||||
|
payload => PayloadBin,
|
||||||
|
topic => <<"t/a">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
?assertMatch({ok, #{<<"payload">> := _}}, SqlResult),
|
||||||
|
{ok, #{<<"payload">> := PayloadMap2}} = SqlResult,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"f1">> := <<"f1">>,
|
||||||
|
<<"f2">> := <<"f2">>,
|
||||||
|
<<"data">> := [
|
||||||
|
#{<<"name">> := <<"STEVE">>, <<"idx">> := 1},
|
||||||
|
#{<<"name">> := <<"n2">>, <<"idx">> := 2}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
PayloadMap2
|
||||||
|
).
|
||||||
|
|
||||||
t_sqlparse_event_1(_Config) ->
|
t_sqlparse_event_1(_Config) ->
|
||||||
Sql =
|
Sql =
|
||||||
"select topic as tp "
|
"select topic as tp "
|
||||||
|
|
Loading…
Reference in New Issue