From 01d89be74335dc6f7c4f6560c72b131ff42a3840 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 11 Jul 2024 12:13:42 -0300 Subject: [PATCH] feat(message transformation): add timestamp and pub_props fields to read context Fixes https://emqx.atlassian.net/browse/EMQX-12684 Fixes https://emqx.atlassian.net/browse/EMQX-12678 --- .../src/emqx_message_transformation.erl | 5 +++ ..._message_transformation_http_api_SUITE.erl | 39 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 94951ff4f..569f79846 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -62,9 +62,11 @@ node := _, payload := _, peername := _, + pub_props := _, publish_received_at := _, qos := _, retain := _, + timestamp := _, topic := _, user_property := _, username := _, @@ -345,6 +347,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> undefined end, Username = maps:get(username, Headers, undefined), + Timestamp = erlang:system_time(millisecond), #{ dirty => Dirty, @@ -355,9 +358,11 @@ message_to_context(#message{} = Message, Payload, Transformation) -> node => node(), payload => Payload, peername => Peername, + pub_props => Props, publish_received_at => Message#message.timestamp, qos => Message#message.qos, retain => emqx_message:get_flag(retain, Message, false), + timestamp => Timestamp, topic => Message#message.topic, user_property => UserProperties, username => Username diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index fa83c8024..8d08c4706 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -504,7 +504,7 @@ assert_monitor_metrics() -> receive PATTERN = ____Msg0 -> ____Msg0 after TIMEOUT -> - error({message_not_received, ?LINE}) + error({message_not_received, {line, ?LINE}}) end end)() ). @@ -608,6 +608,8 @@ t_smoke_test(_Config) -> %% * peername %% * publish_received_at %% * username +%% * timestamp +%% * pub_props (and specific fields within containing hyphens) t_smoke_test_2(_Config) -> Name1 = <<"foo">>, Operations = [ @@ -617,14 +619,22 @@ t_smoke_test_2(_Config) -> operation(<<"payload.peername">>, <<"peername">>), operation(<<"payload.publish_received_at">>, <<"publish_received_at">>), operation(<<"payload.username">>, <<"username">>), - operation(<<"payload.flags">>, <<"flags">>) + operation(<<"payload.flags">>, <<"flags">>), + operation(<<"payload.timestamp">>, <<"timestamp">>), + operation(<<"payload.pub_props">>, <<"pub_props">>), + operation(<<"payload.content_type">>, <<"pub_props.Content-Type">>) ], Transformation1 = transformation(Name1, Operations), {201, _} = insert(Transformation1), ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId), {ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]), - ok = publish(C1, <<"t/1">>, #{}), + ok = publish(C1, <<"t/1">>, #{}, _QoS = 0, #{ + props => #{ + 'Content-Type' => <<"application/json">>, + 'User-Property' => [{<<"a">>, <<"b">>}] + } + }), {publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000), NodeBin = atom_to_binary(node()), ?assertMatch( @@ -635,7 +645,13 @@ t_smoke_test_2(_Config) -> <<"peername">> := <<"127.0.0.1:", _/binary>>, <<"publish_received_at">> := PRAt, <<"username">> := <<"undefined">>, - <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}, + <<"timestamp">> := _, + <<"pub_props">> := #{ + <<"Content-Type">> := <<"application/json">>, + <<"User-Property">> := #{<<"a">> := <<"b">>} + }, + <<"content_type">> := <<"application/json">> } when is_integer(PRAt), emqx_utils_json:decode(Payload0, [return_maps]) ), @@ -644,7 +660,12 @@ t_smoke_test_2(_Config) -> Username = <<"myusername">>, C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}), {ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]), - ok = publish(C2, <<"t/1">>, #{}), + ok = publish(C2, <<"t/1">>, #{}, _QoS = 0, #{ + props => #{ + 'Content-Type' => <<"application/json">>, + 'User-Property' => [{<<"a">>, <<"b">>}] + } + }), {publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000), ?assertMatch( #{ @@ -654,7 +675,13 @@ t_smoke_test_2(_Config) -> <<"peername">> := <<"127.0.0.1:", _/binary>>, <<"publish_received_at">> := PRAt, <<"username">> := Username, - <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}, + <<"timestamp">> := _, + <<"pub_props">> := #{ + <<"Content-Type">> := <<"application/json">>, + <<"User-Property">> := #{<<"a">> := <<"b">>} + }, + <<"content_type">> := <<"application/json">> } when is_integer(PRAt), emqx_utils_json:decode(Payload1, [return_maps]) ),