fix(suite): add republish qos & retain test case
This commit is contained in:
parent
e1e2fd50fd
commit
c5f754c3b8
|
@ -43,14 +43,12 @@
|
||||||
title => #{en => <<"Target QoS">>,
|
title => #{en => <<"Target QoS">>,
|
||||||
zh => <<"目的 QoS"/utf8>>},
|
zh => <<"目的 QoS"/utf8>>},
|
||||||
description => #{en =>
|
description => #{en =>
|
||||||
<<"The QoS Level to be uses when republishing the message."
|
<<"The QoS Level to be used when republishing the message."
|
||||||
" Support placeholder variables."
|
" Support placeholder variables."
|
||||||
" Set to ${qos} to use the original QoS."
|
" Set to ${qos} to use the original QoS. Default is 0">>,
|
||||||
" Or other variable, value is 0 or 1 or 2">>,
|
|
||||||
zh =>
|
zh =>
|
||||||
<<"重新发布消息时用的 QoS 级别。"
|
<<"重新发布消息时用的 QoS 级别。"
|
||||||
"支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS,"
|
"支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS。默认 0"/utf8>>}
|
||||||
"或其他值为 0 或 1 或 2 的变量。"/utf8>>}
|
|
||||||
},
|
},
|
||||||
target_retain => #{
|
target_retain => #{
|
||||||
order => 3,
|
order => 3,
|
||||||
|
@ -61,7 +59,7 @@
|
||||||
default => false,
|
default => false,
|
||||||
title => #{en => <<"Target Retain">>,
|
title => #{en => <<"Target Retain">>,
|
||||||
zh => <<"目标保留消息标识"/utf8>>},
|
zh => <<"目标保留消息标识"/utf8>>},
|
||||||
description => #{en => <<"The Retain flag to be uses when republishing the message."
|
description => #{en => <<"The Retain flag to be used when republishing the message."
|
||||||
" Set to ${flags.retain} to use the original Retain."
|
" Set to ${flags.retain} to use the original Retain."
|
||||||
" Support placeholder variables. Default is false">>,
|
" Support placeholder variables. Default is false">>,
|
||||||
zh => <<"重新发布消息时用的保留消息标识。"
|
zh => <<"重新发布消息时用的保留消息标识。"
|
||||||
|
|
|
@ -405,16 +405,31 @@ t_reset_metrics(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_republish_action(_Config) ->
|
t_republish_action(_Config) ->
|
||||||
Qos0Received = emqx_metrics:val('messages.qos0.received'),
|
TargetQoSList = [-1, 0, 1, 2, <<"${qos}">>],
|
||||||
|
TargetRetainList = [true, false, <<"${flags.retain}">>],
|
||||||
|
[[republish_action_test(TargetQoS, TargetRetain) || TargetRetain <- TargetRetainList]
|
||||||
|
|| TargetQoS <- TargetQoSList],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
republish_action_test(TargetQoS, TargetRetain) ->
|
||||||
|
{QoSReceivedMetricsName, PubQoS} =
|
||||||
|
case TargetQoS of
|
||||||
|
<<"${qos}">> -> {'messages.qos0.received', 0};
|
||||||
|
-1 -> {'messages.qos0.received', 0};
|
||||||
|
0 -> {'messages.qos0.received', 0};
|
||||||
|
1 -> {'messages.qos1.received', 1};
|
||||||
|
2 -> {'messages.qos2.received', 2}
|
||||||
|
end,
|
||||||
|
QosReceived = emqx_metrics:val(QoSReceivedMetricsName),
|
||||||
Received = emqx_metrics:val('messages.received'),
|
Received = emqx_metrics:val('messages.received'),
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
{ok, #rule{id = Id, for = [<<"t1">>]}} =
|
{ok, #rule{id = Id, for = [<<"t1">>]}} =
|
||||||
emqx_rule_engine:create_rule(
|
emqx_rule_engine:create_rule(
|
||||||
#{rawsql => <<"select topic, payload, qos from \"t1\"">>,
|
#{rawsql => <<"select * from \"t1\"">>,
|
||||||
actions => [#{name => 'republish',
|
actions => [#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => TargetQoS,
|
||||||
<<"target_retain">> => <<"false">>,
|
<<"target_retain">> => TargetRetain,
|
||||||
<<"payload_tmpl">> => <<"${payload}">>}}],
|
<<"payload_tmpl">> => <<"${payload}">>}}],
|
||||||
description => <<"builtin-republish-rule">>}),
|
description => <<"builtin-republish-rule">>}),
|
||||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
|
@ -422,7 +437,7 @@ t_republish_action(_Config) ->
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
|
||||||
Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>,
|
Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>,
|
||||||
emqtt:publish(Client, <<"t1">>, Msg, 0),
|
emqtt:publish(Client, <<"t1">>, Msg, PubQoS),
|
||||||
receive {publish, #{topic := <<"t2">>, payload := Payload}} ->
|
receive {publish, #{topic := <<"t2">>, payload := Payload}} ->
|
||||||
?assertEqual(Msg, Payload)
|
?assertEqual(Msg, Payload)
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
|
@ -430,7 +445,7 @@ t_republish_action(_Config) ->
|
||||||
end,
|
end,
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
emqx_rule_registry:remove_rule(Id),
|
emqx_rule_registry:remove_rule(Id),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received),
|
?assertEqual(2, emqx_metrics:val(QoSReceivedMetricsName) - QosReceived),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.received') - Received),
|
?assertEqual(2, emqx_metrics:val('messages.received') - Received),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -480,8 +495,7 @@ t_crud_rule_api(_Config) ->
|
||||||
{<<"params">>,[
|
{<<"params">>,[
|
||||||
{<<"arg1">>,1},
|
{<<"arg1">>,1},
|
||||||
{<<"target_topic">>, <<"t2">>},
|
{<<"target_topic">>, <<"t2">>},
|
||||||
{<<"target_qos">>, <<"0">>},
|
{<<"target_qos">>, 0},
|
||||||
{<<"target_retain">>, <<"false">>},
|
|
||||||
{<<"payload_tmpl">>, <<"${payload}">>}
|
{<<"payload_tmpl">>, <<"${payload}">>}
|
||||||
]}
|
]}
|
||||||
]]
|
]]
|
||||||
|
@ -1620,8 +1634,7 @@ t_sqlselect_multi_actoins_1(Config) ->
|
||||||
#{name => 'crash_action', args => #{}, fallbacks => []},
|
#{name => 'crash_action', args => #{}, fallbacks => []},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -1647,8 +1660,7 @@ t_sqlselect_multi_actoins_1_1(Config) ->
|
||||||
#{name => 'crash_action', args => #{}, fallbacks => []},
|
#{name => 'crash_action', args => #{}, fallbacks => []},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -1676,8 +1688,7 @@ t_sqlselect_multi_actoins_2(Config) ->
|
||||||
#{name => 'crash_action', args => #{}, fallbacks => []},
|
#{name => 'crash_action', args => #{}, fallbacks => []},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -1708,8 +1719,7 @@ t_sqlselect_multi_actoins_3(Config) ->
|
||||||
]},
|
]},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -1747,8 +1757,7 @@ t_sqlselect_multi_actoins_3_1(Config) ->
|
||||||
]},
|
]},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -1787,8 +1796,7 @@ t_sqlselect_multi_actoins_4(Config) ->
|
||||||
]},
|
]},
|
||||||
#{name => 'republish',
|
#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
<<"payload_tmpl">> => <<"clientid=${clientid}">>
|
||||||
},
|
},
|
||||||
fallbacks => []}
|
fallbacks => []}
|
||||||
|
@ -2542,8 +2550,7 @@ create_simple_repub_rule(TargetTopic, SQL, Template) ->
|
||||||
#{rawsql => SQL,
|
#{rawsql => SQL,
|
||||||
actions => [#{name => 'republish',
|
actions => [#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => TargetTopic,
|
args => #{<<"target_topic">> => TargetTopic,
|
||||||
<<"target_qos">> => <<"-1">>,
|
<<"target_qos">> => -1,
|
||||||
<<"target_retain">> => <<"false">>,
|
|
||||||
<<"payload_tmpl">> => Template}
|
<<"payload_tmpl">> => Template}
|
||||||
}],
|
}],
|
||||||
description => <<"simple repub rule">>}),
|
description => <<"simple repub rule">>}),
|
||||||
|
|
Loading…
Reference in New Issue