Merge pull request #9398 from emqx/1121-rule-bridge-properties
1121 rule bridge properties
This commit is contained in:
commit
cded5fc6cf
|
@ -54,7 +54,8 @@
|
||||||
pmap/3,
|
pmap/3,
|
||||||
readable_error_msg/1,
|
readable_error_msg/1,
|
||||||
safe_to_existing_atom/1,
|
safe_to_existing_atom/1,
|
||||||
safe_to_existing_atom/2
|
safe_to_existing_atom/2,
|
||||||
|
pub_props_to_packet/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -568,3 +569,17 @@ ipv6_probe_test() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
pub_props_to_packet(Properties) ->
|
||||||
|
F = fun
|
||||||
|
('User-Property', M) ->
|
||||||
|
case is_map(M) andalso map_size(M) > 0 of
|
||||||
|
true -> {true, maps:to_list(M)};
|
||||||
|
false -> false
|
||||||
|
end;
|
||||||
|
('User-Property-Pairs', _) ->
|
||||||
|
false;
|
||||||
|
(_, _) ->
|
||||||
|
true
|
||||||
|
end,
|
||||||
|
maps:filtermap(F, Properties).
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_connector, [
|
{application, emqx_connector, [
|
||||||
{description, "An OTP application"},
|
{description, "An OTP application"},
|
||||||
{vsn, "0.1.7"},
|
{vsn, "0.1.8"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_connector_app, []}},
|
{mod, {emqx_connector_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
|
||||||
Payload = process_payload(PayloadToken, MapMsg),
|
Payload = process_payload(PayloadToken, MapMsg),
|
||||||
QoS = replace_simple_var(QoSToken, MapMsg),
|
QoS = replace_simple_var(QoSToken, MapMsg),
|
||||||
Retain = replace_simple_var(RetainToken, MapMsg),
|
Retain = replace_simple_var(RetainToken, MapMsg),
|
||||||
|
PubProps = maps:get(pub_props, MapMsg, #{}),
|
||||||
#mqtt_msg{
|
#mqtt_msg{
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
topic = topic(Mountpoint, Topic),
|
topic = topic(Mountpoint, Topic),
|
||||||
props = #{},
|
props = emqx_misc:pub_props_to_packet(PubProps),
|
||||||
payload = Payload
|
payload = Payload
|
||||||
};
|
};
|
||||||
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
|
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
|
||||||
Msg#message{topic = topic(Mountpoint, Topic)}.
|
Msg#message{topic = topic(Mountpoint, Topic)}.
|
||||||
|
|
||||||
%% published from remote node over a MQTT connection
|
%% published from remote node over a MQTT connection
|
||||||
|
to_broker_msg(Msg, Vars, undefined) ->
|
||||||
|
to_broker_msg(Msg, Vars, #{});
|
||||||
to_broker_msg(
|
to_broker_msg(
|
||||||
#{dup := Dup} = MapMsg,
|
#{dup := Dup} = MapMsg,
|
||||||
#{
|
#{
|
||||||
|
@ -103,8 +106,9 @@ to_broker_msg(
|
||||||
Payload = process_payload(PayloadToken, MapMsg),
|
Payload = process_payload(PayloadToken, MapMsg),
|
||||||
QoS = replace_simple_var(QoSToken, MapMsg),
|
QoS = replace_simple_var(QoSToken, MapMsg),
|
||||||
Retain = replace_simple_var(RetainToken, MapMsg),
|
Retain = replace_simple_var(RetainToken, MapMsg),
|
||||||
|
PubProps = maps:get(pub_props, MapMsg, #{}),
|
||||||
set_headers(
|
set_headers(
|
||||||
Props,
|
Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
|
||||||
emqx_message:set_flags(
|
emqx_message:set_flags(
|
||||||
#{dup => Dup, retain => Retain},
|
#{dup => Dup, retain => Retain},
|
||||||
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
|
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
|
||||||
|
@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
|
||||||
estimate_size(Term) ->
|
estimate_size(Term) ->
|
||||||
erlang:external_size(Term).
|
erlang:external_size(Term).
|
||||||
|
|
||||||
set_headers(undefined, Msg) ->
|
|
||||||
Msg;
|
|
||||||
set_headers(Val, Msg) ->
|
set_headers(Val, Msg) ->
|
||||||
emqx_message:set_headers(Val, Msg).
|
emqx_message:set_headers(Val, Msg).
|
||||||
topic(undefined, Topic) -> Topic;
|
topic(undefined, Topic) -> Topic;
|
||||||
|
|
|
@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
|
||||||
of the rule, then the string "undefined" is used.
|
of the rule, then the string "undefined" is used.
|
||||||
"""
|
"""
|
||||||
zh: """
|
zh: """
|
||||||
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。。
|
要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
|
||||||
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
|
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
|
||||||
"""
|
"""
|
||||||
}
|
}
|
||||||
|
@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
|
||||||
zh: "消息负载"
|
zh: "消息负载"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
republish_args_user_properties {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
From which variable should the MQTT message's User-Property pairs be taken from.
|
||||||
|
The value must be a map.
|
||||||
|
You may configure it to <code>${pub_props.'User-Property'}</code> or
|
||||||
|
use <code>SELECT *,pub_props.'User-Property' as user_properties</code>
|
||||||
|
to forward the original user properties to the republished message.
|
||||||
|
You may also call <code>map_put</code> function like
|
||||||
|
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
|
||||||
|
to inject user properties.
|
||||||
|
NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
|
||||||
|
可以设置成 <code>${pub_props.'User-Property'}</code> 或者
|
||||||
|
使用 <code>SELECT *,pub_props.'User-Property' as user_properties</code> 来把源 MQTT 消息
|
||||||
|
的 User-Property 列表用于填充。
|
||||||
|
也可以使用 <code>map_put</code> 函数来添加新的 User-Property,
|
||||||
|
<code>map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties</code>
|
||||||
|
注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rule_engine_ignore_sys_message {
|
rule_engine_ignore_sys_message {
|
||||||
desc {
|
desc {
|
||||||
|
|
|
@ -37,6 +37,8 @@
|
||||||
|
|
||||||
-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
|
-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
|
||||||
|
|
||||||
|
-define(ORIGINAL_USER_PROPERTIES, original).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -57,7 +59,8 @@ pre_process_action_args(
|
||||||
topic := Topic,
|
topic := Topic,
|
||||||
qos := QoS,
|
qos := QoS,
|
||||||
retain := Retain,
|
retain := Retain,
|
||||||
payload := Payload
|
payload := Payload,
|
||||||
|
user_properties := UserProperties
|
||||||
} = Args
|
} = Args
|
||||||
) ->
|
) ->
|
||||||
Args#{
|
Args#{
|
||||||
|
@ -65,7 +68,8 @@ pre_process_action_args(
|
||||||
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
|
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
|
||||||
qos => preproc_vars(QoS),
|
qos => preproc_vars(QoS),
|
||||||
retain => preproc_vars(Retain),
|
retain => preproc_vars(Retain),
|
||||||
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
|
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
|
||||||
|
user_properties => preproc_user_properties(UserProperties)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
pre_process_action_args(_, Args) ->
|
pre_process_action_args(_, Args) ->
|
||||||
|
@ -93,16 +97,16 @@ republish(
|
||||||
_Args
|
_Args
|
||||||
) ->
|
) ->
|
||||||
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
|
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
|
||||||
%% republish a PUBLISH message
|
|
||||||
republish(
|
republish(
|
||||||
Selected,
|
Selected,
|
||||||
#{flags := Flags, metadata := #{rule_id := RuleId}},
|
#{metadata := #{rule_id := RuleId}} = Env,
|
||||||
#{
|
#{
|
||||||
preprocessed_tmpl := #{
|
preprocessed_tmpl := #{
|
||||||
qos := QoSTks,
|
qos := QoSTks,
|
||||||
retain := RetainTks,
|
retain := RetainTks,
|
||||||
topic := TopicTks,
|
topic := TopicTks,
|
||||||
payload := PayloadTks
|
payload := PayloadTks,
|
||||||
|
user_properties := UserPropertiesTks
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
|
@ -110,27 +114,22 @@ republish(
|
||||||
Payload = format_msg(PayloadTks, Selected),
|
Payload = format_msg(PayloadTks, Selected),
|
||||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||||
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
|
%% 'flags' is set for message re-publishes or message related
|
||||||
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
|
%% events such as message.acked and message.dropped
|
||||||
%% in case this is a "$events/" event
|
Flags0 = maps:get(flags, Env, #{}),
|
||||||
republish(
|
Flags = Flags0#{retain => Retain},
|
||||||
Selected,
|
PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
|
||||||
#{metadata := #{rule_id := RuleId}},
|
?TRACE(
|
||||||
|
"RULE",
|
||||||
|
"republish_message",
|
||||||
#{
|
#{
|
||||||
preprocessed_tmpl := #{
|
flags => Flags,
|
||||||
qos := QoSTks,
|
topic => Topic,
|
||||||
retain := RetainTks,
|
payload => Payload,
|
||||||
topic := TopicTks,
|
pub_props => PubProps
|
||||||
payload := PayloadTks
|
|
||||||
}
|
}
|
||||||
}
|
),
|
||||||
) ->
|
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
|
||||||
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
|
|
||||||
Payload = format_msg(PayloadTks, Selected),
|
|
||||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
|
||||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
|
||||||
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
|
|
||||||
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
@ -168,13 +167,16 @@ pre_process_args(Mod, Func, Args) ->
|
||||||
false -> Args
|
false -> Args
|
||||||
end.
|
end.
|
||||||
|
|
||||||
safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
|
safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
|
||||||
Msg = #message{
|
Msg = #message{
|
||||||
id = emqx_guid:gen(),
|
id = emqx_guid:gen(),
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
from = RuleId,
|
from = RuleId,
|
||||||
flags = Flags,
|
flags = Flags,
|
||||||
headers = #{republish_by => RuleId},
|
headers = #{
|
||||||
|
republish_by => RuleId,
|
||||||
|
properties => emqx_misc:pub_props_to_packet(PubProps)
|
||||||
|
},
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = Payload,
|
payload = Payload,
|
||||||
timestamp = erlang:system_time(millisecond)
|
timestamp = erlang:system_time(millisecond)
|
||||||
|
@ -187,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
|
||||||
preproc_vars(Data) ->
|
preproc_vars(Data) ->
|
||||||
Data.
|
Data.
|
||||||
|
|
||||||
|
preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
|
||||||
|
%% keep the original
|
||||||
|
%% avoid processing this special variable because
|
||||||
|
%% we do not want to force users to select the value
|
||||||
|
%% the value will be taken from Env.pub_props directly
|
||||||
|
?ORIGINAL_USER_PROPERTIES;
|
||||||
|
preproc_user_properties(<<"${", _/binary>> = V) ->
|
||||||
|
%% use a variable
|
||||||
|
emqx_plugin_libs_rule:preproc_tmpl(V);
|
||||||
|
preproc_user_properties(_) ->
|
||||||
|
%% invalid, discard
|
||||||
|
undefined.
|
||||||
|
|
||||||
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
|
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
|
||||||
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||||
case Var of
|
case Var of
|
||||||
|
@ -201,3 +216,15 @@ format_msg([], Selected) ->
|
||||||
emqx_json:encode(Selected);
|
emqx_json:encode(Selected);
|
||||||
format_msg(Tokens, Selected) ->
|
format_msg(Tokens, Selected) ->
|
||||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
|
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
|
||||||
|
|
||||||
|
format_pub_props(UserPropertiesTks, Selected, Env) ->
|
||||||
|
UserProperties =
|
||||||
|
case UserPropertiesTks of
|
||||||
|
?ORIGINAL_USER_PROPERTIES ->
|
||||||
|
maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
|
||||||
|
undefined ->
|
||||||
|
#{};
|
||||||
|
_ ->
|
||||||
|
replace_simple_var(UserPropertiesTks, Selected, #{})
|
||||||
|
end,
|
||||||
|
#{'User-Property' => UserProperties}.
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_rule_engine, [
|
{application, emqx_rule_engine, [
|
||||||
{description, "EMQX Rule Engine"},
|
{description, "EMQX Rule Engine"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.3"},
|
{vsn, "5.0.4"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
||||||
{applications, [kernel, stdlib, rulesql, getopt]},
|
{applications, [kernel, stdlib, rulesql, getopt]},
|
||||||
|
|
|
@ -173,6 +173,15 @@ fields("republish_args") ->
|
||||||
default => <<"${payload}">>,
|
default => <<"${payload}">>,
|
||||||
example => <<"${payload}">>
|
example => <<"${payload}">>
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{user_properties,
|
||||||
|
?HOCON(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
desc => ?DESC("republish_args_user_properties"),
|
||||||
|
default => <<"${user_properties}">>,
|
||||||
|
example => <<"${pub_props.'User-Property'}">>
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
|
||||||
(K, V, AccIn) ->
|
(K, V, AccIn) ->
|
||||||
AccIn#{K => V}
|
AccIn#{K => V}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{'User-Property' => #{}},
|
||||||
Headers
|
Headers
|
||||||
).
|
).
|
||||||
|
|
||||||
|
|
|
@ -59,11 +59,14 @@ groups() ->
|
||||||
t_sqlselect_0,
|
t_sqlselect_0,
|
||||||
t_sqlselect_00,
|
t_sqlselect_00,
|
||||||
t_sqlselect_001,
|
t_sqlselect_001,
|
||||||
|
t_sqlselect_inject_props,
|
||||||
t_sqlselect_01,
|
t_sqlselect_01,
|
||||||
t_sqlselect_02,
|
t_sqlselect_02,
|
||||||
t_sqlselect_1,
|
t_sqlselect_1,
|
||||||
t_sqlselect_2,
|
t_sqlselect_2,
|
||||||
t_sqlselect_3,
|
t_sqlselect_3,
|
||||||
|
t_sqlselect_message_publish_event_keep_original_props_1,
|
||||||
|
t_sqlselect_message_publish_event_keep_original_props_2,
|
||||||
t_sqlparse_event_1,
|
t_sqlparse_event_1,
|
||||||
t_sqlparse_event_2,
|
t_sqlparse_event_2,
|
||||||
t_sqlparse_event_3,
|
t_sqlparse_event_3,
|
||||||
|
@ -936,9 +939,10 @@ t_sqlselect_001(_Config) ->
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_sqlselect_01(_Config) ->
|
t_sqlselect_inject_props(_Config) ->
|
||||||
SQL =
|
SQL =
|
||||||
"SELECT json_decode(payload) as p, payload "
|
"SELECT json_decode(payload) as p, payload, "
|
||||||
|
"map_put('inject_key', 'inject_val', user_properties) as user_properties "
|
||||||
"FROM \"t3/#\", \"t1\" "
|
"FROM \"t3/#\", \"t1\" "
|
||||||
"WHERE p.x = 1",
|
"WHERE p.x = 1",
|
||||||
Repub = republish_action(<<"t2">>),
|
Repub = republish_action(<<"t2">>),
|
||||||
|
@ -949,34 +953,64 @@ t_sqlselect_01(_Config) ->
|
||||||
actions => [Repub]
|
actions => [Repub]
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}),
|
||||||
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
|
||||||
ct:sleep(100),
|
receive
|
||||||
|
{publish, #{topic := T, payload := Payload, properties := Props2}} ->
|
||||||
|
?assertEqual(Props, Props2),
|
||||||
|
?assertEqual(<<"t2">>, T),
|
||||||
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
||||||
|
after 2000 ->
|
||||||
|
ct:fail(wait_for_t2)
|
||||||
|
end,
|
||||||
|
emqtt:stop(Client),
|
||||||
|
delete_rule(TopicRule1).
|
||||||
|
|
||||||
|
t_sqlselect_01(_Config) ->
|
||||||
|
SQL =
|
||||||
|
"SELECT json_decode(payload) as p, payload "
|
||||||
|
"FROM \"t3/#\", \"t1\" "
|
||||||
|
"WHERE p.x = 1",
|
||||||
|
Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>),
|
||||||
|
{ok, TopicRule1} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [Repub]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
Props = user_properties(#{<<"mykey">> => <<"myval">>}),
|
||||||
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
|
||||||
receive
|
receive
|
||||||
{publish, #{topic := T, payload := Payload}} ->
|
{publish, #{topic := T, payload := Payload}} ->
|
||||||
?assertEqual(<<"t2">>, T),
|
?assertEqual(<<"t2">>, T),
|
||||||
?assertEqual(<<"{\"x\":1}">>, Payload)
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
||||||
after 1000 ->
|
after 2000 ->
|
||||||
ct:fail(wait_for_t2)
|
ct:fail(wait_for_t2)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
|
emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]),
|
||||||
receive
|
receive
|
||||||
{publish, #{topic := <<"t2">>, payload := _}} ->
|
{publish, #{topic := <<"t2">>, payload := _}} ->
|
||||||
ct:fail(unexpected_t2)
|
ct:fail(unexpected_t2)
|
||||||
after 1000 ->
|
after 2000 ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
|
||||||
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
|
emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
|
||||||
receive
|
receive
|
||||||
{publish, #{topic := T3, payload := Payload3}} ->
|
{publish, #{topic := T3, payload := Payload3, properties := Props2}} ->
|
||||||
|
?assertEqual(Props, Props2),
|
||||||
?assertEqual(<<"t2">>, T3),
|
?assertEqual(<<"t2">>, T3),
|
||||||
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
||||||
after 1000 ->
|
after 2000 ->
|
||||||
ct:fail(wait_for_t2)
|
ct:fail(wait_for_t3)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
|
@ -1044,13 +1078,12 @@ t_sqlselect_1(_Config) ->
|
||||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
ct:sleep(200),
|
|
||||||
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
|
||||||
receive
|
receive
|
||||||
{publish, #{topic := T, payload := Payload}} ->
|
{publish, #{topic := T, payload := Payload}} ->
|
||||||
?assertEqual(<<"t2">>, T),
|
?assertEqual(<<"t2">>, T),
|
||||||
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
|
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
|
||||||
after 1000 ->
|
after 2000 ->
|
||||||
ct:fail(wait_for_t2)
|
ct:fail(wait_for_t2)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -1113,14 +1146,13 @@ t_sqlselect_3(_Config) ->
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
ct:sleep(200),
|
|
||||||
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
|
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
receive
|
receive
|
||||||
{publish, #{topic := T, payload := Payload}} ->
|
{publish, #{topic := T, payload := Payload}} ->
|
||||||
?assertEqual(<<"t2">>, T),
|
?assertEqual(<<"t2">>, T),
|
||||||
?assertEqual(<<"clientid=c_emqx1">>, Payload)
|
?assertEqual(<<"clientid=c_emqx1">>, Payload)
|
||||||
after 1000 ->
|
after 2000 ->
|
||||||
ct:fail(wait_for_t2)
|
ct:fail(wait_for_t2)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -1135,6 +1167,82 @@ t_sqlselect_3(_Config) ->
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
delete_rule(TopicRule).
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
|
t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
|
||||||
|
%% republish the client.connected msg
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
SQL = <<
|
||||||
|
"SELECT clientid "
|
||||||
|
"FROM \"$events/message_dropped\" "
|
||||||
|
>>,
|
||||||
|
|
||||||
|
%"WHERE topic = \"", Topic/binary, "\"">>,
|
||||||
|
Repub = republish_action(
|
||||||
|
<<"t2">>,
|
||||||
|
<<"clientid=${clientid}">>,
|
||||||
|
<<"${pub_props.'User-Property'}">>
|
||||||
|
),
|
||||||
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [Repub]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
|
||||||
|
{ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
Props = user_properties(#{<<"mykey">> => <<"111111">>}),
|
||||||
|
emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
|
||||||
|
receive
|
||||||
|
{publish, #{topic := T, payload := Payload, properties := Props1}} ->
|
||||||
|
?assertEqual(Props1, Props),
|
||||||
|
?assertEqual(<<"t2">>, T),
|
||||||
|
?assertEqual(<<"clientid=pub-02">>, Payload)
|
||||||
|
after 2000 ->
|
||||||
|
ct:fail(wait_for_t2)
|
||||||
|
end,
|
||||||
|
emqtt:stop(Client2),
|
||||||
|
emqtt:stop(Client1),
|
||||||
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
|
t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
|
||||||
|
%% republish the client.connected msg
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
SQL = <<
|
||||||
|
"SELECT clientid, pub_props.'User-Property' as user_properties "
|
||||||
|
"FROM \"$events/message_dropped\" "
|
||||||
|
>>,
|
||||||
|
|
||||||
|
%"WHERE topic = \"", Topic/binary, "\"">>,
|
||||||
|
Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
|
||||||
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [Repub]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
|
||||||
|
{ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
|
||||||
|
emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
|
||||||
|
receive
|
||||||
|
{publish, #{topic := T, payload := Payload, properties := Props1}} ->
|
||||||
|
?assertEqual(Props1, Props),
|
||||||
|
?assertEqual(<<"t2">>, T),
|
||||||
|
?assertEqual(<<"clientid=pub-02">>, Payload)
|
||||||
|
after 2000 ->
|
||||||
|
ct:fail(wait_for_t2)
|
||||||
|
end,
|
||||||
|
emqtt:stop(Client2),
|
||||||
|
emqtt:stop(Client1),
|
||||||
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
t_sqlparse_event_1(_Config) ->
|
t_sqlparse_event_1(_Config) ->
|
||||||
Sql =
|
Sql =
|
||||||
"select topic as tp "
|
"select topic as tp "
|
||||||
|
@ -2480,10 +2588,20 @@ t_get_basic_usage_info_1(_Config) ->
|
||||||
|
|
||||||
republish_action(Topic) ->
|
republish_action(Topic) ->
|
||||||
republish_action(Topic, <<"${payload}">>).
|
republish_action(Topic, <<"${payload}">>).
|
||||||
|
|
||||||
republish_action(Topic, Payload) ->
|
republish_action(Topic, Payload) ->
|
||||||
|
republish_action(Topic, Payload, <<"${user_properties}">>).
|
||||||
|
|
||||||
|
republish_action(Topic, Payload, UserProperties) ->
|
||||||
#{
|
#{
|
||||||
function => republish,
|
function => republish,
|
||||||
args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
|
args => #{
|
||||||
|
payload => Payload,
|
||||||
|
topic => Topic,
|
||||||
|
qos => 0,
|
||||||
|
retain => false,
|
||||||
|
user_properties => UserProperties
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
||||||
|
@ -2869,6 +2987,9 @@ verify_ipaddr(IPAddrS) ->
|
||||||
init_events_counters() ->
|
init_events_counters() ->
|
||||||
ets:new(events_record_tab, [named_table, bag, public]).
|
ets:new(events_record_tab, [named_table, bag, public]).
|
||||||
|
|
||||||
|
user_properties(PairsMap) ->
|
||||||
|
#{'User-Property' => maps:to_list(PairsMap)}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Start Apps
|
%% Start Apps
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -21,6 +21,8 @@
|
||||||
|
|
||||||
- Set the default value for the maximum level of a topic to 128 [#9406](https://github.com/emqx/emqx/pull/9406).
|
- Set the default value for the maximum level of a topic to 128 [#9406](https://github.com/emqx/emqx/pull/9406).
|
||||||
|
|
||||||
|
- Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
|
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
- 将主题的最大层级限制的默认值设置为128 [#9406](https://github.com/emqx/emqx/pull/9406)。
|
- 将主题的最大层级限制的默认值设置为128 [#9406](https://github.com/emqx/emqx/pull/9406)。
|
||||||
|
|
||||||
|
- 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。
|
||||||
|
|
||||||
## 修复
|
## 修复
|
||||||
|
|
||||||
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
|
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
|
||||||
|
|
Loading…
Reference in New Issue