Merge pull request #13384 from thalesmg/20240701-r57-mt-new-fields
feat(message transformation): add more read-only fields to context
This commit is contained in:
commit
532c7831b2
|
@ -3,6 +3,8 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_message_transformation).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
@ -54,11 +56,18 @@
|
|||
|
||||
-type eval_context() :: #{
|
||||
client_attrs := map(),
|
||||
clientid := _,
|
||||
flags := _,
|
||||
id := _,
|
||||
node := _,
|
||||
payload := _,
|
||||
peername := _,
|
||||
publish_received_at := _,
|
||||
qos := _,
|
||||
retain := _,
|
||||
topic := _,
|
||||
user_property := _,
|
||||
username := _,
|
||||
dirty := #{
|
||||
payload => true,
|
||||
qos => true,
|
||||
|
@ -323,20 +332,35 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
|||
true -> #{};
|
||||
false -> #{payload => true}
|
||||
end,
|
||||
UserProperties0 = maps:get(
|
||||
'User-Property',
|
||||
emqx_message:get_header(properties, Message, #{}),
|
||||
[]
|
||||
),
|
||||
Flags = emqx_message:get_flags(Message),
|
||||
Props = emqx_message:get_header(properties, Message, #{}),
|
||||
UserProperties0 = maps:get('User-Property', Props, []),
|
||||
UserProperties = maps:from_list(UserProperties0),
|
||||
Headers = Message#message.headers,
|
||||
Peername =
|
||||
case maps:get(peername, Headers, undefined) of
|
||||
Peername0 when is_tuple(Peername0) ->
|
||||
iolist_to_binary(emqx_utils:ntoa(Peername0));
|
||||
_ ->
|
||||
undefined
|
||||
end,
|
||||
Username = maps:get(username, Headers, undefined),
|
||||
#{
|
||||
dirty => Dirty,
|
||||
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
|
||||
clientid => Message#message.from,
|
||||
flags => Flags,
|
||||
id => emqx_guid:to_hexstr(Message#message.id),
|
||||
node => node(),
|
||||
payload => Payload,
|
||||
peername => Peername,
|
||||
publish_received_at => Message#message.timestamp,
|
||||
qos => Message#message.qos,
|
||||
retain => emqx_message:get_flag(retain, Message, false),
|
||||
topic => Message#message.topic,
|
||||
user_property => UserProperties
|
||||
user_property => UserProperties,
|
||||
username => Username
|
||||
}.
|
||||
|
||||
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
||||
|
|
|
@ -285,14 +285,17 @@ connect(ClientId, IsPersistent) ->
|
|||
connect(ClientId, IsPersistent, _Opts = #{}).
|
||||
|
||||
connect(ClientId, IsPersistent, Opts) ->
|
||||
StartProps = maps:get(start_props, Opts, #{}),
|
||||
Properties0 = maps:get(properties, Opts, #{}),
|
||||
Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent),
|
||||
{ok, Client} = emqtt:start_link([
|
||||
{clean_start, true},
|
||||
{clientid, ClientId},
|
||||
{properties, Properties},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
Defaults = #{
|
||||
clean_start => true,
|
||||
clientid => ClientId,
|
||||
properties => Properties,
|
||||
proto_ver => v5
|
||||
},
|
||||
Props = emqx_utils_maps:deep_merge(Defaults, StartProps),
|
||||
{ok, Client} = emqtt:start_link(Props),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
on_exit(fun() -> catch emqtt:stop(Client) end),
|
||||
Client.
|
||||
|
@ -496,11 +499,21 @@ assert_monitor_metrics() ->
|
|||
),
|
||||
ok.
|
||||
|
||||
-define(assertReceiveReturn(PATTERN, TIMEOUT),
|
||||
(fun() ->
|
||||
receive
|
||||
PATTERN = ____Msg0 -> ____Msg0
|
||||
after TIMEOUT ->
|
||||
error({message_not_received, ?LINE})
|
||||
end
|
||||
end)()
|
||||
).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%% Smoke test where we have a single check and `all_pass' strategy.
|
||||
%% Smoke test where we have an example transfomration.
|
||||
t_smoke_test(_Config) ->
|
||||
Name1 = <<"foo">>,
|
||||
Operations = [
|
||||
|
@ -588,6 +601,65 @@ t_smoke_test(_Config) ->
|
|||
|
||||
ok.
|
||||
|
||||
%% A smoke test for a subset of read-only context fields.
|
||||
%% * clientid
|
||||
%% * id
|
||||
%% * node
|
||||
%% * peername
|
||||
%% * publish_received_at
|
||||
%% * username
|
||||
t_smoke_test_2(_Config) ->
|
||||
Name1 = <<"foo">>,
|
||||
Operations = [
|
||||
operation(<<"payload.clientid">>, <<"clientid">>),
|
||||
operation(<<"payload.id">>, <<"id">>),
|
||||
operation(<<"payload.node">>, <<"node">>),
|
||||
operation(<<"payload.peername">>, <<"peername">>),
|
||||
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
|
||||
operation(<<"payload.username">>, <<"username">>),
|
||||
operation(<<"payload.flags">>, <<"flags">>)
|
||||
],
|
||||
Transformation1 = transformation(Name1, Operations),
|
||||
{201, _} = insert(Transformation1),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
C1 = connect(ClientId),
|
||||
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
|
||||
ok = publish(C1, <<"t/1">>, #{}),
|
||||
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
|
||||
NodeBin = atom_to_binary(node()),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"clientid">> := ClientId,
|
||||
<<"id">> := <<_/binary>>,
|
||||
<<"node">> := NodeBin,
|
||||
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
||||
<<"publish_received_at">> := PRAt,
|
||||
<<"username">> := <<"undefined">>,
|
||||
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
|
||||
} when is_integer(PRAt),
|
||||
emqx_utils_json:decode(Payload0, [return_maps])
|
||||
),
|
||||
%% Reconnect with an username.
|
||||
emqtt:stop(C1),
|
||||
Username = <<"myusername">>,
|
||||
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
|
||||
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
|
||||
ok = publish(C2, <<"t/1">>, #{}),
|
||||
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
|
||||
?assertMatch(
|
||||
#{
|
||||
<<"clientid">> := ClientId,
|
||||
<<"id">> := <<_/binary>>,
|
||||
<<"node">> := NodeBin,
|
||||
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
||||
<<"publish_received_at">> := PRAt,
|
||||
<<"username">> := Username,
|
||||
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
|
||||
} when is_integer(PRAt),
|
||||
emqx_utils_json:decode(Payload1, [return_maps])
|
||||
),
|
||||
ok.
|
||||
|
||||
t_crud(_Config) ->
|
||||
?assertMatch({200, []}, list()),
|
||||
|
||||
|
|
Loading…
Reference in New Issue