Merge branch 'master' into sync-m-r52-20230911

This commit is contained in:
Thales Macedo Garitezi 2023-09-11 16:04:28 -03:00
commit 223d47a6ac
17 changed files with 622 additions and 74 deletions

View File

@ -49,11 +49,7 @@
%% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer(),
%% Topic filter to iterator ID mapping.
%% Note: we shouldn't serialize this when persisting sessions, as this information
%% also exists in the `?ITERATOR_REF_TAB' table.
iterators = #{} :: #{emqx_topic:topic() => emqx_ds:iterator_id()}
created_at :: pos_integer()
}).
-endif.

View File

@ -108,12 +108,6 @@ get_all_iterator_ids(Node) ->
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
end).
get_session_iterators(Node, ClientId) ->
erpc:call(Node, fun() ->
[ConnPid] = emqx_cm:lookup_channels(ClientId),
emqx_connection:info({channel, {session, iterators}}, sys:get_state(ConnPid))
end).
wait_nodeup(Node) ->
?retry(
_Sleep0 = 500,
@ -209,18 +203,14 @@ t_session_subscription_idempotency(Config) ->
{ok, _} = emqtt:connect(Client1),
ct:pal("subscribing 2"),
{ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
SessionIterators = get_session_iterators(Node1, ClientId),
ok = emqtt:stop(Client1),
#{session_iterators => SessionIterators}
ok
end,
fun(Res, Trace) ->
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% Exactly one iterator should have been opened.
?assertEqual(1, map_size(SessionIterators), #{iterators => SessionIterators}),
?assertMatch(#{SubTopicFilter := _}, SessionIterators),
SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
@ -321,17 +311,14 @@ t_session_unsubscription_idempotency(Config) ->
},
15_000
),
SessionIterators = get_session_iterators(Node1, ClientId),
ok = emqtt:stop(Client1),
#{session_iterators => SessionIterators}
ok
end,
fun(Res, Trace) ->
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
#{session_iterators := SessionIterators} = Res,
%% No iterators remaining
?assertEqual(#{}, SessionIterators),
?assertEqual([], get_all_iterator_refs(Node1)),
?assertEqual({ok, []}, get_all_iterator_ids(Node1)),
ok

View File

@ -24,7 +24,7 @@
persist_message/1,
open_session/1,
add_subscription/2,
del_subscription/3
del_subscription/2
]).
-export([
@ -139,21 +139,26 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
{ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay),
ok.
-spec del_subscription(emqx_ds:iterator_id() | undefined, emqx_types:topic(), emqx_ds:session_id()) ->
-spec del_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
ok | {skipped, disabled}.
del_subscription(IteratorID, TopicFilterBin, DSSessionID) ->
del_subscription(TopicFilterBin, DSSessionID) ->
?WHEN_ENABLED(
begin
TopicFilter = emqx_topic:words(TopicFilterBin),
Ctx = #{iterator_id => IteratorID},
?tp_span(
persistent_session_ds_close_iterators,
Ctx,
ok = ensure_iterator_closed_on_all_shards(IteratorID)
),
case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
{error, not_found} ->
%% already gone
ok;
{ok, IteratorID} ->
?tp_span(
persistent_session_ds_close_iterators,
#{iterator_id => IteratorID},
ok = ensure_iterator_closed_on_all_shards(IteratorID)
)
end,
?tp_span(
persistent_session_ds_iterator_delete,
Ctx,
#{},
emqx_ds:session_del_iterator(DSSessionID, TopicFilter)
)
end

View File

@ -117,7 +117,8 @@ mnesia(boot) ->
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, auto}
{write_concurrency, true},
{decentralized_counters, true}
]}
]}
]).

View File

@ -269,9 +269,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt;
info(iterators, #session{iterators = Iterators}) ->
Iterators.
CreatedAt.
%% @doc Get stats of the session.
-spec stats(session()) -> emqx_types:stats().
@ -320,13 +318,8 @@ is_subscriptions_full(#session{
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
session().
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
{ok, IteratorId, _IsNew} ->
Iterators = Session#session.iterators,
Session#session{iterators = Iterators#{TopicFilterBin => IteratorId}};
_ ->
Session
end.
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
Session.
%%--------------------------------------------------------------------
%% Client -> Broker: UNSUBSCRIBE
@ -356,15 +349,8 @@ unsubscribe(
-spec remove_persistent_subscription(session(), emqx_types:topic(), emqx_types:clientid()) ->
session().
remove_persistent_subscription(Session, TopicFilterBin, ClientId) ->
Iterators = Session#session.iterators,
case maps:get(TopicFilterBin, Iterators, undefined) of
undefined ->
ok;
IteratorId ->
_ = emqx_persistent_session_ds:del_subscription(IteratorId, TopicFilterBin, ClientId),
ok
end,
Session#session{iterators = maps:remove(TopicFilterBin, Iterators)}.
_ = emqx_persistent_session_ds:del_subscription(TopicFilterBin, ClientId),
Session.
%%--------------------------------------------------------------------
%% Client -> Broker: PUBLISH

View File

@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
#{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}],
<<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}
),

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
{vsn, "5.0.26"},
{vsn, "5.0.27"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},

View File

@ -20,6 +20,7 @@
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqtt/include/emqtt.hrl").
%% APIs
-export([parse_action/1]).
@ -60,16 +61,23 @@ pre_process_action_args(
qos := QoS,
retain := Retain,
payload := Payload,
user_properties := UserProperties
mqtt_properties := MQTTPropertiesTemplate0,
user_properties := UserPropertiesTemplate
} = Args
) ->
MQTTPropertiesTemplate =
maps:map(
fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end,
MQTTPropertiesTemplate0
),
Args#{
preprocessed_tmpl => #{
topic => emqx_placeholder:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_placeholder:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties)
mqtt_properties => MQTTPropertiesTemplate,
user_properties => preproc_user_properties(UserPropertiesTemplate)
}
};
pre_process_action_args(_, Args) ->
@ -106,6 +114,7 @@ republish(
retain := RetainTks,
topic := TopicTks,
payload := PayloadTks,
mqtt_properties := MQTTPropertiesTemplate,
user_properties := UserPropertiesTks
}
}
@ -118,7 +127,9 @@ republish(
%% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain},
PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env),
MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
PubProps = maps:merge(PubProps0, MQTTProps),
?TRACE(
"RULE",
"republish_message",
@ -232,3 +243,89 @@ format_pub_props(UserPropertiesTks, Selected, Env) ->
replace_simple_var(UserPropertiesTks, Selected, #{})
end,
#{'User-Property' => UserProperties}.
format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
#{metadata := #{rule_id := RuleId}} = Env,
MQTTProperties0 =
maps:fold(
fun(K, Template, Acc) ->
try
V = emqx_placeholder:proc_tmpl(Template, Selected),
Acc#{K => V}
catch
Kind:Error ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
exception => Kind,
reason => Error,
property => K,
selected => Selected
}
),
Acc
end
end,
#{},
MQTTPropertiesTemplate
),
coerce_properties_values(MQTTProperties0, Env).
ensure_int(B) when is_binary(B) ->
try
binary_to_integer(B)
catch
error:badarg ->
throw(bad_integer)
end;
ensure_int(I) when is_integer(I) ->
I.
coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) ->
maps:fold(
fun(K, V0, Acc) ->
try
V = encode_mqtt_property(K, V0),
Acc#{K => V}
catch
throw:bad_integer ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
reason => bad_integer,
property => K,
value => V0
}
),
Acc;
Kind:Reason:Stacktrace ->
?SLOG(
debug,
#{
msg => "bad_mqtt_property_value_ignored",
rule_id => RuleId,
exception => Kind,
reason => Reason,
property => K,
value => V0,
stacktrace => Stacktrace
}
),
Acc
end
end,
#{},
MQTTProperties
).
%% Note: currently we do not support `Topic-Alias', which would need to be encoded as an
%% int.
encode_mqtt_property('Payload-Format-Indicator', V) -> ensure_int(V);
encode_mqtt_property('Message-Expiry-Interval', V) -> ensure_int(V);
encode_mqtt_property('Subscription-Identifier', V) -> ensure_int(V);
%% note: `emqx_placeholder:proc_tmpl/2' currently always return a binary.
encode_mqtt_property(_Prop, V) when is_binary(V) -> V.

View File

@ -63,7 +63,7 @@ fields("rules") ->
)},
{"actions",
?HOCON(
?ARRAY(?UNION(actions())),
?ARRAY(hoconsc:union(actions())),
#{
desc => ?DESC("rules_actions"),
default => [],
@ -161,6 +161,14 @@ fields("republish_args") ->
example => <<"${payload}">>
}
)},
{mqtt_properties,
?HOCON(
?R_REF("republish_mqtt_properties"),
#{
desc => ?DESC("republish_args_mqtt_properties"),
default => #{}
}
)},
{user_properties,
?HOCON(
binary(),
@ -170,6 +178,17 @@ fields("republish_args") ->
example => <<"${pub_props.'User-Property'}">>
}
)}
];
fields("republish_mqtt_properties") ->
[
{'Payload-Format-Indicator',
?HOCON(binary(), #{required => false, desc => ?DESC('Payload-Format-Indicator')})},
{'Message-Expiry-Interval',
?HOCON(binary(), #{required => false, desc => ?DESC('Message-Expiry-Interval')})},
{'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})},
{'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})},
{'Correlation-Data',
?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})}
].
desc("rule_engine") ->
@ -200,12 +219,31 @@ rule_name() ->
)}.
actions() ->
[
binary(),
?R_REF("builtin_action_republish"),
?R_REF("builtin_action_console"),
?R_REF("user_provided_function")
].
fun
(all_union_members) ->
[
binary(),
?R_REF("builtin_action_republish"),
?R_REF("builtin_action_console"),
?R_REF("user_provided_function")
];
({value, V}) ->
case V of
#{<<"function">> := <<"console">>} ->
[?R_REF("builtin_action_console")];
#{<<"function">> := <<"republish">>} ->
[?R_REF("builtin_action_republish")];
#{<<"function">> := <<_/binary>>} ->
[?R_REF("user_provided_function")];
<<_/binary>> ->
[binary()];
_ ->
throw(#{
field_name => actions,
reason => <<"unknown action type">>
})
end
end.
qos() ->
?UNION([emqx_schema:qos(), binary()]).

View File

@ -23,6 +23,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@ -74,6 +75,7 @@ groups() ->
t_sqlselect_inject_props,
t_sqlselect_01,
t_sqlselect_02,
t_sqlselect_03,
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
@ -580,13 +582,16 @@ t_get_rule_ids_by_action(_) ->
t_ensure_action_removed(_) ->
Id = <<"t_ensure_action_removed">>,
GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
emqx:update_config(
{ok, _} = emqx:update_config(
[rule_engine, rules, Id],
#{
<<"actions">> => [
#{<<"function">> => GetSelectedData},
#{<<"function">> => <<"console">>},
#{<<"function">> => <<"republish">>},
#{
<<"function">> => <<"republish">>,
<<"args">> => #{<<"topic">> => <<"some/topic">>}
},
<<"mysql:foo">>,
<<"mqtt:bar">>
],
@ -1467,6 +1472,260 @@ t_sqlselect_02(_Config) ->
emqtt:stop(Client),
delete_rule(TopicRule1).
t_sqlselect_03(_Config) ->
init_events_counters(),
SQL = "SELECT * FROM \"t/r\" ",
Repub = republish_action(
<<"t/republish">>,
<<"${.}">>,
<<"${pub_props.'User-Property'}">>,
#{
<<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>,
<<"Message-Expiry-Interval">> => <<"${.payload.mei}">>,
<<"Content-Type">> => <<"${.payload.ct}">>,
<<"Response-Topic">> => <<"${.payload.rt}">>,
<<"Correlation-Data">> => <<"${.payload.cd}">>
}
),
RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}),
ct:pal("republish action raw:\n ~p", [RepubRaw]),
RuleRaw = #{
<<"sql">> => SQL,
<<"actions">> => [RepubRaw]
},
{ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}),
on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end),
%% to check what republish is actually producing without loss of information
SQL1 = "select * from \"t/republish\" ",
RuleId0 = ?TMP_RULEID,
RuleId1 = <<RuleId0/binary, "2">>,
{ok, _} = emqx_rule_engine:create_rule(
#{
sql => SQL1,
id => RuleId1,
actions => [
#{
function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
args => #{}
}
]
}
),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end),
UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}),
Payload =
emqx_utils_json:encode(
#{
pfi => 1,
mei => 2,
ct => <<"3">>,
rt => <<"4">>,
cd => <<"5">>
}
),
{ok, Client} = emqtt:start_link([
{username, <<"emqx">>},
{proto_ver, v5},
{properties, #{'Topic-Alias-Maximum' => 100}}
]),
on_exit(fun() -> emqtt:stop(Client) end),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0),
PubProps = #{'User-Property' => UserProps},
ExpectedMQTTProps0 = #{
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 2,
'Content-Type' => <<"3">>,
'Response-Topic' => <<"4">>,
'Correlation-Data' => <<"5">>,
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]),
receive
{publish, #{topic := <<"t/republish">>, properties := Props1}} ->
?assertEqual(ExpectedMQTTProps0, Props1),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ExpectedMQTTProps1 = #{
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 2,
'Content-Type' => <<"3">>,
'Response-Topic' => <<"4">>,
'Correlation-Data' => <<"5">>,
'User-Property' => maps:from_list(UserProps),
'User-Property-Pairs' => [
#{key => K, value => V}
|| {K, V} <- UserProps
]
},
?assertMatch(
[
{'message.publish', #{
topic := <<"t/republish">>,
pub_props := ExpectedMQTTProps1
}}
],
ets:lookup(events_record_tab, 'message.publish'),
#{expected_props => ExpectedMQTTProps1}
),
ct:pal("testing payload that is not a json object"),
emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]),
ExpectedMQTTProps2 = #{
'Content-Type' => <<"undefined">>,
'Correlation-Data' => <<"undefined">>,
'Response-Topic' => <<"undefined">>,
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
receive
{publish, #{topic := T1, properties := Props2}} ->
?assertEqual(ExpectedMQTTProps2, Props2),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T1),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ct:pal("testing payload with some uncoercible keys"),
ets:delete_all_objects(events_record_tab),
Payload1 =
emqx_utils_json:encode(#{
pfi => <<"bad_value1">>,
mei => <<"bad_value2">>,
ct => <<"some_value3">>,
rt => <<"some_value4">>,
cd => <<"some_value5">>
}),
emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]),
ExpectedMQTTProps3 = #{
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'Content-Type' => <<"some_value3">>,
'Response-Topic' => <<"some_value4">>,
'Correlation-Data' => <<"some_value5">>,
'User-Property' => UserProps
},
receive
{publish, #{topic := T2, properties := Props3}} ->
?assertEqual(ExpectedMQTTProps3, Props3),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T2),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ExpectedMQTTProps4 = #{
'Content-Type' => <<"some_value3">>,
'Response-Topic' => <<"some_value4">>,
'Correlation-Data' => <<"some_value5">>,
'User-Property' => maps:from_list(UserProps),
'User-Property-Pairs' => [
#{key => K, value => V}
|| {K, V} <- UserProps
]
},
?assertMatch(
[
{'message.publish', #{
topic := <<"t/republish">>,
pub_props := ExpectedMQTTProps4
}}
],
ets:lookup(events_record_tab, 'message.publish'),
#{expected_props => ExpectedMQTTProps4}
),
ct:pal("testing a payload with a more complex placeholder"),
Repub1 = republish_action(
<<"t/republish">>,
<<"${.}">>,
<<"${pub_props.'User-Property'}">>,
#{
%% Note: `Payload-Format-Indicator' is capped at 225.
<<"Payload-Format-Indicator">> => <<"1${.payload.pfi}3">>,
<<"Message-Expiry-Interval">> => <<"9${.payload.mei}6">>
}
),
RepubRaw1 = emqx_utils_maps:binary_key_map(Repub1#{function => <<"republish">>}),
ct:pal("republish action raw:\n ~p", [RepubRaw1]),
RuleRaw1 = #{
<<"sql">> => SQL,
<<"actions">> => [RepubRaw1]
},
{ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw1, #{}),
Payload2 =
emqx_utils_json:encode(#{
pfi => <<"2">>,
mei => <<"87">>
}),
emqtt:publish(Client, <<"t/r">>, PubProps, Payload2, [{qos, 0}]),
ExpectedMQTTProps5 = #{
%% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
%% serializing nor parsing...
'Payload-Format-Indicator' => 123,
'Message-Expiry-Interval' => 9876,
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
receive
{publish, #{topic := T3, properties := Props4}} ->
?assertEqual(ExpectedMQTTProps5, Props4),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T3),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ct:pal("testing payload-format-indicator cap"),
Payload3 =
emqx_utils_json:encode(#{
pfi => <<"999999">>,
mei => <<"87">>
}),
emqtt:publish(Client, <<"t/r">>, PubProps, Payload3, [{qos, 0}]),
ExpectedMQTTProps6 = #{
%% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
%% serializing nor parsing...
%% Note: PFI is capped at 16#FF
'Payload-Format-Indicator' => 16#FF band 19999993,
'Message-Expiry-Interval' => 9876,
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
receive
{publish, #{topic := T4, properties := Props5}} ->
?assertEqual(ExpectedMQTTProps6, Props5),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T4),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ok.
t_sqlselect_1(_Config) ->
SQL =
"SELECT json_decode(payload) as p, payload "
@ -3271,6 +3530,9 @@ republish_action(Topic, Payload) ->
republish_action(Topic, Payload, <<"${user_properties}">>).
republish_action(Topic, Payload, UserProperties) ->
republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
#{
function => republish,
args => #{
@ -3278,6 +3540,7 @@ republish_action(Topic, Payload, UserProperties) ->
topic => Topic,
qos => 0,
retain => false,
mqtt_properties => MQTTProperties,
user_properties => UserProperties
}
}.

View File

@ -0,0 +1,132 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_schema_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Data Section
%%===========================================================================
%% erlfmt-ignore
republish_hocon0() ->
"""
rule_engine.rules.my_rule {
description = \"some desc\"
metadata = {created_at = 1693918992079}
sql = \"select * from \\\"t/topic\\\" \"
actions = [
{function = console}
{ function = republish
args = {
payload = \"${.}\"
qos = 0
retain = false
topic = \"t/repu\"
mqtt_properties {
\"Payload-Format-Indicator\" = \"${.payload.pfi}\"
\"Message-Expiry-Interval\" = \"${.payload.mei}\"
\"Content-Type\" = \"${.payload.ct}\"
\"Response-Topic\" = \"${.payload.rt}\"
\"Correlation-Data\" = \"${.payload.cd}\"
}
user_properties = \"${pub_props.'User-Property'}\"
}
},
\"bridges:kafka:kprodu\",
{ function = custom_fn
args = {
actually = not_republish
}
}
]
}
""".
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_rule_engine_schema, Conf).
-define(validation_error(Reason, Value),
{emqx_rule_engine_schema, [
#{
kind := validation_error,
reason := Reason,
value := Value
}
]}
).
-define(ok_config(Cfg), #{
<<"rule_engine">> :=
#{
<<"rules">> :=
#{
<<"my_rule">> :=
Cfg
}
}
}).
%%===========================================================================
%% Test cases
%%===========================================================================
republish_test_() ->
BaseConf = parse(republish_hocon0()),
[
{"base config",
?_assertMatch(
?ok_config(
#{
<<"actions">> := [
#{<<"function">> := console},
#{
<<"function">> := republish,
<<"args">> :=
#{
<<"mqtt_properties">> :=
#{
<<"Payload-Format-Indicator">> := <<_/binary>>,
<<"Message-Expiry-Interval">> := <<_/binary>>,
<<"Content-Type">> := <<_/binary>>,
<<"Response-Topic">> := <<_/binary>>,
<<"Correlation-Data">> := <<_/binary>>
}
}
},
<<"bridges:kafka:kprodu">>,
#{
<<"function">> := <<"custom_fn">>,
<<"args">> :=
#{
<<"actually">> := <<"not_republish">>
}
}
]
}
),
check(BaseConf)
)}
].

View File

@ -277,20 +277,29 @@ lookup_var([Prop | Rest], Data0) ->
end.
lookup(Prop, Data) when is_binary(Prop) ->
case maps:get(Prop, Data, undefined) of
undefined ->
try
{ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)}
case do_one_lookup(Prop, Data) of
{error, undefined} ->
try binary_to_existing_atom(Prop, utf8) of
AtomKey ->
do_one_lookup(AtomKey, Data)
catch
error:{badkey, _} ->
{error, undefined};
error:badarg ->
{error, undefined}
end;
Value ->
{ok, Value} ->
{ok, Value}
end.
do_one_lookup(Key, Data) ->
try
{ok, maps:get(Key, Data)}
catch
error:{badkey, _} ->
{error, undefined};
error:{badmap, _} ->
{error, undefined}
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.0.7"},
{vsn, "5.0.8"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -256,3 +256,25 @@ t_proc_tmpl_arbitrary_var_name_double_quote(_) ->
<<"a:1,a:1-1,b:1,b:2,c:1.0,d:oo,d1:hi}">>,
emqx_placeholder:proc_tmpl(Tks, Selected)
).
t_proc_tmpl_badmap(_Config) ->
ThisTks = emqx_placeholder:preproc_tmpl(<<"${.}">>),
Tks = emqx_placeholder:preproc_tmpl(<<"${.a.b.c}">>),
BadMap = <<"not-a-map">>,
?assertEqual(
<<"not-a-map">>,
emqx_placeholder:proc_tmpl(ThisTks, BadMap)
),
?assertEqual(
<<"undefined">>,
emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => #{<<"b">> => BadMap}})
),
?assertEqual(
<<"undefined">>,
emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => BadMap})
),
?assertEqual(
<<"undefined">>,
emqx_placeholder:proc_tmpl(Tks, BadMap)
),
ok.

View File

@ -0,0 +1 @@
Added support for defining templates for MQTT publish properties in Republish rule action.

View File

@ -0,0 +1 @@
Fixed an issue where an ill-defined builtin rule action config could be interpreted as a custom user function.

View File

@ -103,6 +103,16 @@ You may also call <code>map_put</code> function like
to inject user properties.
NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not."""
republish_args_user_properties.label:
"""User Properties"""
republish_args_mqtt_properties.desc:
"""From which variable should the MQTT Publish Properties of the message be taken.
Placeholders like <code>${.payload.content_type}</code> may be used."""
republish_args_mqtt_properties.label:
"""MQTT Properties"""
republish_function.desc:
"""Republish the message as a new MQTT message"""