diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index cff7e3133..94951ff4f 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_message_transformation). +-feature(maybe_expr, enable). + -include_lib("snabbkaffe/include/trace.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -54,11 +56,18 @@ -type eval_context() :: #{ client_attrs := map(), + clientid := _, + flags := _, + id := _, + node := _, payload := _, + peername := _, + publish_received_at := _, qos := _, retain := _, topic := _, user_property := _, + username := _, dirty := #{ payload => true, qos => true, @@ -323,20 +332,35 @@ message_to_context(#message{} = Message, Payload, Transformation) -> true -> #{}; false -> #{payload => true} end, - UserProperties0 = maps:get( - 'User-Property', - emqx_message:get_header(properties, Message, #{}), - [] - ), + Flags = emqx_message:get_flags(Message), + Props = emqx_message:get_header(properties, Message, #{}), + UserProperties0 = maps:get('User-Property', Props, []), UserProperties = maps:from_list(UserProperties0), + Headers = Message#message.headers, + Peername = + case maps:get(peername, Headers, undefined) of + Peername0 when is_tuple(Peername0) -> + iolist_to_binary(emqx_utils:ntoa(Peername0)); + _ -> + undefined + end, + Username = maps:get(username, Headers, undefined), #{ dirty => Dirty, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}), + clientid => Message#message.from, + flags => Flags, + id => emqx_guid:to_hexstr(Message#message.id), + node => node(), payload => Payload, + peername => Peername, + publish_received_at => Message#message.timestamp, qos => Message#message.qos, retain => emqx_message:get_flag(retain, Message, false), topic => Message#message.topic, - user_property => UserProperties + user_property => UserProperties, + username => Username }. -spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index 9728ca71d..fa83c8024 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -285,14 +285,17 @@ connect(ClientId, IsPersistent) -> connect(ClientId, IsPersistent, _Opts = #{}). connect(ClientId, IsPersistent, Opts) -> + StartProps = maps:get(start_props, Opts, #{}), Properties0 = maps:get(properties, Opts, #{}), Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent), - {ok, Client} = emqtt:start_link([ - {clean_start, true}, - {clientid, ClientId}, - {properties, Properties}, - {proto_ver, v5} - ]), + Defaults = #{ + clean_start => true, + clientid => ClientId, + properties => Properties, + proto_ver => v5 + }, + Props = emqx_utils_maps:deep_merge(Defaults, StartProps), + {ok, Client} = emqtt:start_link(Props), {ok, _} = emqtt:connect(Client), on_exit(fun() -> catch emqtt:stop(Client) end), Client. @@ -496,11 +499,21 @@ assert_monitor_metrics() -> ), ok. +-define(assertReceiveReturn(PATTERN, TIMEOUT), + (fun() -> + receive + PATTERN = ____Msg0 -> ____Msg0 + after TIMEOUT -> + error({message_not_received, ?LINE}) + end + end)() +). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -%% Smoke test where we have a single check and `all_pass' strategy. +%% Smoke test where we have an example transfomration. t_smoke_test(_Config) -> Name1 = <<"foo">>, Operations = [ @@ -588,6 +601,65 @@ t_smoke_test(_Config) -> ok. +%% A smoke test for a subset of read-only context fields. +%% * clientid +%% * id +%% * node +%% * peername +%% * publish_received_at +%% * username +t_smoke_test_2(_Config) -> + Name1 = <<"foo">>, + Operations = [ + operation(<<"payload.clientid">>, <<"clientid">>), + operation(<<"payload.id">>, <<"id">>), + operation(<<"payload.node">>, <<"node">>), + operation(<<"payload.peername">>, <<"peername">>), + operation(<<"payload.publish_received_at">>, <<"publish_received_at">>), + operation(<<"payload.username">>, <<"username">>), + operation(<<"payload.flags">>, <<"flags">>) + ], + Transformation1 = transformation(Name1, Operations), + {201, _} = insert(Transformation1), + ClientId = atom_to_binary(?FUNCTION_NAME), + C1 = connect(ClientId), + {ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]), + ok = publish(C1, <<"t/1">>, #{}), + {publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000), + NodeBin = atom_to_binary(node()), + ?assertMatch( + #{ + <<"clientid">> := ClientId, + <<"id">> := <<_/binary>>, + <<"node">> := NodeBin, + <<"peername">> := <<"127.0.0.1:", _/binary>>, + <<"publish_received_at">> := PRAt, + <<"username">> := <<"undefined">>, + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + } when is_integer(PRAt), + emqx_utils_json:decode(Payload0, [return_maps]) + ), + %% Reconnect with an username. + emqtt:stop(C1), + Username = <<"myusername">>, + C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}), + {ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]), + ok = publish(C2, <<"t/1">>, #{}), + {publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000), + ?assertMatch( + #{ + <<"clientid">> := ClientId, + <<"id">> := <<_/binary>>, + <<"node">> := NodeBin, + <<"peername">> := <<"127.0.0.1:", _/binary>>, + <<"publish_received_at">> := PRAt, + <<"username">> := Username, + <<"flags">> := #{<<"dup">> := false, <<"retain">> := false} + } when is_integer(PRAt), + emqx_utils_json:decode(Payload1, [return_maps]) + ), + ok. + t_crud(_Config) -> ?assertMatch({200, []}, list()),