Merge remote-tracking branch 'origin/dev/v4.3.20' into dev/v4.4.9

This commit is contained in:
Zaiming (Stone) Shi 2022-09-12 09:25:07 +02:00
commit 1013f221e0
18 changed files with 279 additions and 52 deletions

View File

@ -15,6 +15,14 @@ File format:
### Bug fixes
- Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849)
- Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862)
- Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918)
- Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916)
- Fix rule SQL topic matching to null values failed. [#8927](https://github.com/emqx/emqx/pull/8927)
The following SQL should not fail (crash) but return `{"r": false}`:
`SELECT topic =~ 't' as r FROM "$events/client_connected"`.
The topic is a null value as there's no such field in event `$events/client_connected`, so it
should return false if match it to a topic.
## v4.3.19
@ -22,7 +30,6 @@ File format:
- Improve error message for LwM2M plugin when object ID is not valid. [#8654](https://github.com/emqx/emqx/pull/8654).
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
- Add node evacuation and cluster rebalancing features. [#8597](https://github.com/emqx/emqx/pull/8597)
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
- Increases the latency interval for MQTT Bridge test connections to improve compatibility in high-latency environments. [#8745](https://github.com/emqx/emqx/pull/8745)
- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725)

View File

@ -86,7 +86,7 @@ is_expired(Exp) when is_binary(Exp) ->
?DEBUG("acl_deny_due_to_invalid_jwt_exp:~p", [Exp]),
true
end;
is_expired(Exp) when is_integer(Exp) ->
is_expired(Exp) when is_number(Exp) ->
Now = erlang:system_time(second),
Now > Exp;
is_expired(Exp) ->

View File

@ -201,19 +201,19 @@ do_verify(JwsCompacted, [Jwk|More]) ->
check_claims(Claims) ->
Now = os:system_time(seconds),
Checker = [{<<"exp">>, with_int_value(
Checker = [{<<"exp">>, with_num_value(
fun(ExpireTime) -> Now < ExpireTime end)},
{<<"iat">>, with_int_value(
{<<"iat">>, with_num_value(
fun(IssueAt) -> IssueAt =< Now end)},
{<<"nbf">>, with_int_value(
{<<"nbf">>, with_num_value(
fun(NotBefore) -> NotBefore =< Now end)}
],
do_check_claim(Checker, Claims).
with_int_value(Fun) ->
with_num_value(Fun) ->
fun(Value) ->
case Value of
Int when is_integer(Int) -> Fun(Int);
Num when is_number(Num) -> Fun(Num);
Bin when is_binary(Bin) ->
case emqx_auth_jwt:string_to_number(Bin) of
{ok, Num} -> Fun(Num);

View File

@ -177,6 +177,30 @@ t_check_auth_str_exp(_Config) ->
ct:pal("Auth result: ~p~n", [Result2]),
?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result2).
t_check_auth_float_exp(init, _Config) ->
application:unset_env(emqx_auth_jwt, verify_claims).
t_check_auth_float_exp(_Config) ->
Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
Exp = os:system_time(seconds) + 3.5,
Jwt0 = sign([{clientid, <<"client1">>},
{username, <<"plain">>},
{exp, Exp}], <<"HS256">>, <<"emqxsecret">>),
ct:pal("Jwt: ~p~n", [Jwt0]),
Result0 = emqx_access_control:authenticate(Plain#{password => Jwt0}),
ct:pal("Auth result: ~p~n", [Result0]),
?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0),
Jwt1 = sign([{clientid, <<"client1">>},
{username, <<"plain">>},
{exp, 1.5}], <<"HS256">>, <<"emqxsecret">>),
ct:pal("Jwt: ~p~n", [Jwt1]),
Result1 = emqx_access_control:authenticate(Plain#{password => Jwt1}),
ct:pal("Auth result: ~p~n", [Result1]),
?assertMatch({error, _}, Result1).
t_check_claims(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
t_check_claims(_Config) ->

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.9",
[{"4.3.10",[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,
@ -14,7 +15,8 @@
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.9",
[{"4.3.10",[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,

View File

@ -66,7 +66,7 @@
-opaque(channel() :: #channel{}).
-type(conn_state() :: idle | connecting | connected | disconnected).
-type(conn_state() :: idle | connecting | connected | disconnected | accepted).
-type(reply() :: {outgoing, binary()}
| {outgoing, [binary()]}
@ -159,7 +159,7 @@ init(ConnInfo = #{socktype := Socktype,
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo1,
clientinfo = ClientInfo,
conn_state = idle,
conn_state = accepted,
timers = #{}
},
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of

View File

@ -3,6 +3,7 @@
{VSN,
[{"4.4.8",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
@ -86,6 +87,7 @@
{<<".*">>,[]}],
[{"4.4.8",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},

View File

@ -242,7 +242,10 @@ do_compare('>=', L, R) ->
do_compare('=', L, R) orelse do_compare('>', L, R);
do_compare('<>', L, R) -> L /= R;
do_compare('!=', L, R) -> L /= R;
do_compare('=~', T, F) -> emqx_topic:match(T, F).
do_compare('=~', undefined, undefined) -> true;
do_compare('=~', T, F) when T == undefined; F == undefined -> false;
do_compare('=~', T, F) ->
emqx_topic:match(T, F).
number(Bin) ->
try binary_to_integer(Bin)

View File

@ -2533,6 +2533,13 @@ t_sqlparse_compare_undefined(_Config) ->
%% no match
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00)),
Sql00_1 = "select "
" * "
"from \"t/#\" "
"where dev <> undefined ",
%% no match
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00_1)),
Sql01 = "select "
" 'd' as dev "
"from \"t/#\" "
@ -2541,13 +2548,29 @@ t_sqlparse_compare_undefined(_Config) ->
%% pass
?assertMatch(#{}, Res01),
Sql01_1 = "select "
" 'd' as dev "
"from \"t/#\" "
"where dev <> undefined ",
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
%% pass
?assertMatch(#{}, Res01_1),
Sql02 = "select "
" * "
"from \"t/#\" "
"where dev != 'undefined' ",
{ok, Res02} = ?TEST_SQL(Sql02),
%% pass
?assertMatch(#{}, Res02).
?assertMatch(#{}, Res02),
Sql03 = "select "
" * "
"from \"t/#\" "
"where dev =~ 'undefined' ",
Res03 = ?TEST_SQL(Sql03),
%% no match
?assertMatch({error, nomatch}, Res03).
t_sqlparse_compare_null_null(_Config) ->
%% test undefined == undefined
@ -2566,6 +2589,14 @@ t_sqlparse_compare_null_null(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res01),
%% test undefined <> undefined
Sql01_1 = "select "
" a <> b as c "
"from \"t/#\" ",
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
?assertMatch(#{<<"c">> := false
}, Res01_1),
%% test undefined > undefined
Sql02 = "select "
" a > b as c "
@ -2596,10 +2627,18 @@ t_sqlparse_compare_null_null(_Config) ->
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := true
}, Res05).
}, Res05),
%% test undefined =~ undefined
Sql06 = "select "
" a =~ b as c "
"from \"t/#\" ",
{ok, Res06} = ?TEST_SQL(Sql06),
?assertMatch(#{<<"c">> := true
}, Res06).
t_sqlparse_compare_null_notnull(_Config) ->
%% test undefined == b
%% test undefined == 'b'
Sql00 = "select "
" 'b' as b, a = b as c "
"from \"t/#\" ",
@ -2607,7 +2646,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res00),
%% test undefined != b
%% test undefined != 'b'
Sql01 = "select "
" 'b' as b, a != b as c "
"from \"t/#\" ",
@ -2615,7 +2654,15 @@ t_sqlparse_compare_null_notnull(_Config) ->
?assertMatch(#{<<"c">> := true
}, Res01),
%% test undefined > b
%% test undefined <> 'b'
Sql01_1 = "select "
" 'b' as b, a <> b as c "
"from \"t/#\" ",
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
?assertMatch(#{<<"c">> := true
}, Res01_1),
%% test undefined > 'b'
Sql02 = "select "
" 'b' as b, a > b as c "
"from \"t/#\" ",
@ -2623,7 +2670,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res02),
%% test undefined < b
%% test undefined < 'b'
Sql03 = "select "
" 'b' as b, a < b as c "
"from \"t/#\" ",
@ -2631,7 +2678,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res03),
%% test undefined <= b
%% test undefined <= 'b'
Sql04 = "select "
" 'b' as b, a <= b as c "
"from \"t/#\" ",
@ -2639,13 +2686,21 @@ t_sqlparse_compare_null_notnull(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res04),
%% test undefined >= b
%% test undefined >= 'b'
Sql05 = "select "
" 'b' as b, a >= b as c "
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := false
}, Res05).
}, Res05),
%% test undefined =~ 'b'
Sql06 = "select "
" 'b' as b, a =~ b as c "
"from \"t/#\" ",
{ok, Res06} = ?TEST_SQL(Sql06),
?assertMatch(#{<<"c">> := false
}, Res06).
t_sqlparse_compare_notnull_null(_Config) ->
%% test 'a' == undefined
@ -2664,6 +2719,14 @@ t_sqlparse_compare_notnull_null(_Config) ->
?assertMatch(#{<<"c">> := true
}, Res01),
%% test 'a' <> undefined
Sql01_1 = "select "
" 'a' as a, a <> b as c "
"from \"t/#\" ",
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
?assertMatch(#{<<"c">> := true
}, Res01_1),
%% test 'a' > undefined
Sql02 = "select "
" 'a' as a, a > b as c "
@ -2694,7 +2757,15 @@ t_sqlparse_compare_notnull_null(_Config) ->
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := false
}, Res05).
}, Res05),
%% test 'a' =~ undefined
Sql06 = "select "
" 'a' as a, a =~ b as c "
"from \"t/#\" ",
{ok, Res06} = ?TEST_SQL(Sql06),
?assertMatch(#{<<"c">> := false
}, Res06).
t_sqlparse_compare(_Config) ->
Sql00 = "select "
@ -2704,6 +2775,13 @@ t_sqlparse_compare(_Config) ->
?assertMatch(#{<<"c">> := true
}, Res00),
Sql00_1 = "select "
" 'true' as a, true as b, a = b as c "
"from \"t/#\" ",
{ok, Res00_1} = ?TEST_SQL(Sql00_1),
?assertMatch(#{<<"c">> := true
}, Res00_1),
Sql01 = "select "
" is_null(a) as c "
"from \"t/#\" ",
@ -2732,7 +2810,21 @@ t_sqlparse_compare(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res04),
%% test 'a' >= undefined
Sql04_0 = "select "
" 1 as a, 1 as b, a = b as c "
"from \"t/#\" ",
{ok, Res04_0} = ?TEST_SQL(Sql04_0),
?assertMatch(#{<<"c">> := true
}, Res04_0),
Sql04_1 = "select "
" 1 as a, '1' as b, a = b as c "
"from \"t/#\" ",
{ok, Res04_1} = ?TEST_SQL(Sql04_1),
?assertMatch(#{<<"c">> := true
}, Res04_1),
%% test 1 >= 2
Sql05 = "select "
" 1 as a, 2 as b, a >= b as c "
"from \"t/#\" ",
@ -2740,13 +2832,37 @@ t_sqlparse_compare(_Config) ->
?assertMatch(#{<<"c">> := false
}, Res05),
%% test 'a' >= undefined
%% test 1 <= 2
Sql06 = "select "
" 1 as a, 2 as b, a <= b as c "
"from \"t/#\" ",
{ok, Res06} = ?TEST_SQL(Sql06),
?assertMatch(#{<<"c">> := true
}, Res06).
}, Res06),
%% test 1 != 2
Sql07 = "select "
" 1 as a, 2 as b, a != b as c "
"from \"t/#\" ",
{ok, Res07} = ?TEST_SQL(Sql07),
?assertMatch(#{<<"c">> := true
}, Res07),
%% test 1 <> 2
Sql07_1 = "select "
" 1 as a, 2 as b, a <> b as c "
"from \"t/#\" ",
{ok, Res07_1} = ?TEST_SQL(Sql07_1),
?assertMatch(#{<<"c">> := true
}, Res07_1),
%% test 't' =~ 't'
Sql08 = "select "
" 't' as a, 't' as b, a =~ b as c "
"from \"t/#\" ",
{ok, Res08} = ?TEST_SQL(Sql08),
?assertMatch(#{<<"c">> := true
}, Res08).
t_sqlparse_new_map(_Config) ->
%% construct a range without 'as'

View File

@ -13,8 +13,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 4.4.8
version: 4.4.9
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 4.4.8
appVersion: 4.4.9

View File

@ -11,7 +11,7 @@
]}.
{mapping, "dashboard.listener.http", "emqx_dashboard.listeners", [
{datatype, integer}
{datatype, [integer, ip]}
]}.
{mapping, "dashboard.listener.http.acceptors", "emqx_dashboard.listeners", [
@ -39,7 +39,7 @@
]}.
{mapping, "dashboard.listener.https", "emqx_dashboard.listeners", [
{datatype, integer}
{datatype, [integer, ip]}
]}.
{mapping, "dashboard.listener.https.acceptors", "emqx_dashboard.listeners", [

View File

@ -54,7 +54,8 @@ start_listener({Proto, Port, Options}) ->
https -> minirest:start_https(Server, RanchOpts, Dispatch)
end.
ranch_opts(Port, Options0) ->
ranch_opts(Bind, Options0) ->
IpPort = ip_port(Bind),
NumAcceptors = get_value(num_acceptors, Options0, 4),
MaxConnections = get_value(max_connections, Options0, 512),
Options = lists:foldl(fun({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors ->
@ -68,7 +69,13 @@ ranch_opts(Port, Options0) ->
end, [], Options0),
#{num_acceptors => NumAcceptors,
max_connections => MaxConnections,
socket_opts => [{port, Port} | Options]}.
socket_opts => IpPort ++ Options}.
ip_port({IpStr, Port}) ->
{ok, Ip} = inet:parse_address(IpStr),
[{ip, Ip}, {port, Port}];
ip_port(Port) when is_integer(Port) ->
[{port, Port}].
stop_listeners() ->
lists:foreach(fun(Listener) -> stop_listener(Listener) end, listeners()).

View File

@ -327,7 +327,12 @@ setup_node(Node, Apps) ->
application:set_env(emqx_management, listeners, []),
ok;
(emqx_dashboard) ->
application:set_env(emqx_dashboard, listeners, []),
Options = [{http,{"127.0.0.1",18184},
[{num_acceptors,4},
{max_connections,512},
{inet6,false},
{ipv6_v6only,false}]}],
application:set_env(emqx_dashboard, listeners, Options),
ok;
(_) ->
ok

View File

@ -50,7 +50,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.9"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}

View File

@ -374,12 +374,12 @@ ensure_version(Version, OldInstructions) ->
contains_version(Needle, Haystack) when is_list(Needle) ->
lists:any(
fun(<<"*">>) -> true; %% TODO: delete after we pass esockd 5.8.4
(Regex) when is_binary(Regex) ->
fun(Regex) when is_binary(Regex) ->
Length = length(Needle),
case re:run(Needle, Regex) of
{match, _} ->
{match, [{0, Length}]} ->
true;
nomatch ->
_ ->
false
end;
(Vsn) ->

View File

@ -760,8 +760,8 @@ handle_deliver(Delivers, Channel = #channel{
%% NOTE: Order is important here. While the takeover is in
%% progress, the session cannot enqueue messages, since it already
%% passed on the queue to the new connection in the session state.
NPendings = lists:append(Pendings,
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
NPendings = lists:append(Pendings, NDelivers),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{
@ -769,8 +769,8 @@ handle_deliver(Delivers, Channel = #channel{
takeover = false,
session = Session,
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
NSession = emqx_session:enqueue(ClientInfo,
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session),
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{
@ -801,12 +801,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
end, Delivers).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg)
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
maybe_discard_shared_delivers(Delivers) ->
lists:filtermap(
fun({deliver, Topic, Msg}) ->
case emqx_shared_sub:is_ack_required(Msg) of
false ->
true;
true ->
case emqx_shared_sub:is_retry_dispatch(Msg) of
true ->
%% force enqueue the retried shared deliver
{true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}};
false ->
ok = emqx_shared_sub:nack_no_connection(Msg),
false
end
end
end, Delivers).
%%--------------------------------------------------------------------
%% Handle outgoing packet

View File

@ -46,15 +46,14 @@
, maybe_nack_dropped/1
, nack_no_connection/1
, is_ack_required/1
, is_retry_dispatch/1
, get_group/1
]).
%% for testing
-ifdef(TEST).
-export([ subscribers/2
, ack_enabled/0
, strategy/1
]).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
%% gen_server callbacks
@ -239,6 +238,13 @@ get_group(Msg) ->
-spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
-spec(is_retry_dispatch(emqx_types:message()) -> boolean()).
is_retry_dispatch(Msg) ->
case get_group_ack(Msg) of
{_Sender, {retry, _Group, _Ref}} -> true;
_ -> false
end.
%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) ->
@ -280,10 +286,15 @@ maybe_ack(Msg) ->
Msg;
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK},
ack(Sender, Ref),
without_group_ack(Msg)
end.
-spec(ack(pid(), reference()) -> ok).
ack(Sender, Ref) ->
Sender ! {Ref, ?ACK},
ok.
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
%% These clauses are for backward compatibility
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.

View File

@ -483,6 +483,45 @@ t_handle_deliver_nl(_) ->
NMsg = emqx_message:set_flag(nl, Msg),
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
t_handle_deliver_shared_in_no_connection(_) ->
Grp = <<"g">>,
Sender = self(),
Ref1 = make_ref(),
Ref2 = make_ref(),
Chann = emqx_channel:set_field(conn_state, disconnected, channel()),
Msg0 = emqx_shared_sub:with_group_ack(
emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>),
Grp,
fresh,
Sender,
Ref1
),
Msg1 = emqx_shared_sub:with_group_ack(
emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>),
Grp,
retry,
Sender,
Ref2
),
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
%% all shared msgs should be queued if shared_dispatch_ack_enabled=false
meck:new(emqx_shared_sub, [passthrough, no_history]),
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end),
{ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann),
?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))),
meck:unload(emqx_shared_sub),
%% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true
meck:new(emqx_shared_sub, [passthrough, no_history]),
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end),
{ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann),
?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))),
receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end,
receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end,
meck:unload(emqx_shared_sub).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%%--------------------------------------------------------------------