feat(message transformation): add timestamp and pub_props fields to read context

Fixes https://emqx.atlassian.net/browse/EMQX-12684

Fixes https://emqx.atlassian.net/browse/EMQX-12678
This commit is contained in:
Thales Macedo Garitezi 2024-07-11 12:13:42 -03:00
parent 44e4b3616d
commit 01d89be743
2 changed files with 38 additions and 6 deletions

View File

@ -62,9 +62,11 @@
node := _,
payload := _,
peername := _,
pub_props := _,
publish_received_at := _,
qos := _,
retain := _,
timestamp := _,
topic := _,
user_property := _,
username := _,
@ -345,6 +347,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
undefined
end,
Username = maps:get(username, Headers, undefined),
Timestamp = erlang:system_time(millisecond),
#{
dirty => Dirty,
@ -355,9 +358,11 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
node => node(),
payload => Payload,
peername => Peername,
pub_props => Props,
publish_received_at => Message#message.timestamp,
qos => Message#message.qos,
retain => emqx_message:get_flag(retain, Message, false),
timestamp => Timestamp,
topic => Message#message.topic,
user_property => UserProperties,
username => Username

View File

@ -504,7 +504,7 @@ assert_monitor_metrics() ->
receive
PATTERN = ____Msg0 -> ____Msg0
after TIMEOUT ->
error({message_not_received, ?LINE})
error({message_not_received, {line, ?LINE}})
end
end)()
).
@ -608,6 +608,8 @@ t_smoke_test(_Config) ->
%% * peername
%% * publish_received_at
%% * username
%% * timestamp
%% * pub_props (and specific fields within containing hyphens)
t_smoke_test_2(_Config) ->
Name1 = <<"foo">>,
Operations = [
@ -617,14 +619,22 @@ t_smoke_test_2(_Config) ->
operation(<<"payload.peername">>, <<"peername">>),
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
operation(<<"payload.username">>, <<"username">>),
operation(<<"payload.flags">>, <<"flags">>)
operation(<<"payload.flags">>, <<"flags">>),
operation(<<"payload.timestamp">>, <<"timestamp">>),
operation(<<"payload.pub_props">>, <<"pub_props">>),
operation(<<"payload.content_type">>, <<"pub_props.Content-Type">>)
],
Transformation1 = transformation(Name1, Operations),
{201, _} = insert(Transformation1),
ClientId = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId),
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
ok = publish(C1, <<"t/1">>, #{}),
ok = publish(C1, <<"t/1">>, #{}, _QoS = 0, #{
props => #{
'Content-Type' => <<"application/json">>,
'User-Property' => [{<<"a">>, <<"b">>}]
}
}),
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
NodeBin = atom_to_binary(node()),
?assertMatch(
@ -635,7 +645,13 @@ t_smoke_test_2(_Config) ->
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := <<"undefined">>,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
<<"timestamp">> := _,
<<"pub_props">> := #{
<<"Content-Type">> := <<"application/json">>,
<<"User-Property">> := #{<<"a">> := <<"b">>}
},
<<"content_type">> := <<"application/json">>
} when is_integer(PRAt),
emqx_utils_json:decode(Payload0, [return_maps])
),
@ -644,7 +660,12 @@ t_smoke_test_2(_Config) ->
Username = <<"myusername">>,
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
ok = publish(C2, <<"t/1">>, #{}),
ok = publish(C2, <<"t/1">>, #{}, _QoS = 0, #{
props => #{
'Content-Type' => <<"application/json">>,
'User-Property' => [{<<"a">>, <<"b">>}]
}
}),
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
?assertMatch(
#{
@ -654,7 +675,13 @@ t_smoke_test_2(_Config) ->
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := Username,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
<<"timestamp">> := _,
<<"pub_props">> := #{
<<"Content-Type">> := <<"application/json">>,
<<"User-Property">> := #{<<"a">> := <<"b">>}
},
<<"content_type">> := <<"application/json">>
} when is_integer(PRAt),
emqx_utils_json:decode(Payload1, [return_maps])
),