feat(message transformation): add more read-only fields to context

Fixes https://emqx.atlassian.net/browse/EMQX-12583
This commit is contained in:
Thales Macedo Garitezi 2024-07-01 17:08:47 -03:00
parent 51a8d3b041
commit db07a1ebea
2 changed files with 109 additions and 13 deletions

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_message_transformation). -module(emqx_message_transformation).
-feature(maybe_expr, enable).
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
@ -54,11 +56,18 @@
-type eval_context() :: #{ -type eval_context() :: #{
client_attrs := map(), client_attrs := map(),
clientid := _,
flags := _,
id := _,
node := _,
payload := _, payload := _,
peername := _,
publish_received_at := _,
qos := _, qos := _,
retain := _, retain := _,
topic := _, topic := _,
user_property := _, user_property := _,
username := _,
dirty := #{ dirty := #{
payload => true, payload => true,
qos => true, qos => true,
@ -323,20 +332,35 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
true -> #{}; true -> #{};
false -> #{payload => true} false -> #{payload => true}
end, end,
UserProperties0 = maps:get( Flags = emqx_message:get_flags(Message),
'User-Property', Props = emqx_message:get_header(properties, Message, #{}),
emqx_message:get_header(properties, Message, #{}), UserProperties0 = maps:get('User-Property', Props, []),
[]
),
UserProperties = maps:from_list(UserProperties0), UserProperties = maps:from_list(UserProperties0),
Headers = Message#message.headers,
Peername =
case maps:get(peername, Headers, undefined) of
Peername0 when is_tuple(Peername0) ->
iolist_to_binary(emqx_utils:ntoa(Peername0));
_ ->
undefined
end,
Username = maps:get(username, Headers, undefined),
#{ #{
dirty => Dirty, dirty => Dirty,
client_attrs => emqx_message:get_header(client_attrs, Message, #{}), client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
clientid => Message#message.from,
flags => Flags,
id => emqx_guid:to_hexstr(Message#message.id),
node => node(),
payload => Payload, payload => Payload,
peername => Peername,
publish_received_at => Message#message.timestamp,
qos => Message#message.qos, qos => Message#message.qos,
retain => emqx_message:get_flag(retain, Message, false), retain => emqx_message:get_flag(retain, Message, false),
topic => Message#message.topic, topic => Message#message.topic,
user_property => UserProperties user_property => UserProperties,
username => Username
}. }.
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> -spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->

View File

@ -285,14 +285,17 @@ connect(ClientId, IsPersistent) ->
connect(ClientId, IsPersistent, _Opts = #{}). connect(ClientId, IsPersistent, _Opts = #{}).
connect(ClientId, IsPersistent, Opts) -> connect(ClientId, IsPersistent, Opts) ->
StartProps = maps:get(start_props, Opts, #{}),
Properties0 = maps:get(properties, Opts, #{}), Properties0 = maps:get(properties, Opts, #{}),
Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent), Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent),
{ok, Client} = emqtt:start_link([ Defaults = #{
{clean_start, true}, clean_start => true,
{clientid, ClientId}, clientid => ClientId,
{properties, Properties}, properties => Properties,
{proto_ver, v5} proto_ver => v5
]), },
Props = emqx_utils_maps:deep_merge(Defaults, StartProps),
{ok, Client} = emqtt:start_link(Props),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
on_exit(fun() -> catch emqtt:stop(Client) end), on_exit(fun() -> catch emqtt:stop(Client) end),
Client. Client.
@ -496,11 +499,21 @@ assert_monitor_metrics() ->
), ),
ok. ok.
-define(assertReceiveReturn(PATTERN, TIMEOUT),
(fun() ->
receive
PATTERN = ____Msg0 -> ____Msg0
after TIMEOUT ->
error({message_not_received, ?LINE})
end
end)()
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Smoke test where we have a single check and `all_pass' strategy. %% Smoke test where we have an example transfomration.
t_smoke_test(_Config) -> t_smoke_test(_Config) ->
Name1 = <<"foo">>, Name1 = <<"foo">>,
Operations = [ Operations = [
@ -588,6 +601,65 @@ t_smoke_test(_Config) ->
ok. ok.
%% A smoke test for a subset of read-only context fields.
%% * clientid
%% * id
%% * node
%% * peername
%% * publish_received_at
%% * username
t_smoke_test_2(_Config) ->
Name1 = <<"foo">>,
Operations = [
operation(<<"payload.clientid">>, <<"clientid">>),
operation(<<"payload.id">>, <<"id">>),
operation(<<"payload.node">>, <<"node">>),
operation(<<"payload.peername">>, <<"peername">>),
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
operation(<<"payload.username">>, <<"username">>),
operation(<<"payload.flags">>, <<"flags">>)
],
Transformation1 = transformation(Name1, Operations),
{201, _} = insert(Transformation1),
ClientId = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId),
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
ok = publish(C1, <<"t/1">>, #{}),
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
NodeBin = atom_to_binary(node()),
?assertMatch(
#{
<<"clientid">> := ClientId,
<<"id">> := <<_/binary>>,
<<"node">> := NodeBin,
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := <<"undefined">>,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
} when is_integer(PRAt),
emqx_utils_json:decode(Payload0, [return_maps])
),
%% Reconnect with an username.
emqtt:stop(C1),
Username = <<"myusername">>,
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
ok = publish(C2, <<"t/1">>, #{}),
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
?assertMatch(
#{
<<"clientid">> := ClientId,
<<"id">> := <<_/binary>>,
<<"node">> := NodeBin,
<<"peername">> := <<"127.0.0.1:", _/binary>>,
<<"publish_received_at">> := PRAt,
<<"username">> := Username,
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
} when is_integer(PRAt),
emqx_utils_json:decode(Payload1, [return_maps])
),
ok.
t_crud(_Config) -> t_crud(_Config) ->
?assertMatch({200, []}, list()), ?assertMatch({200, []}, list()),