Merge pull request #8943 from zmstone/merge-v43-to-v44
Merge v43 to v44
This commit is contained in:
commit
cbcde75da2
|
@ -15,6 +15,14 @@ File format:
|
|||
### Bug fixes
|
||||
|
||||
- Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849)
|
||||
- Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862)
|
||||
- Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918)
|
||||
- Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916)
|
||||
- Fix rule SQL topic matching to null values failed. [#8927](https://github.com/emqx/emqx/pull/8927)
|
||||
The following SQL should not fail (crash) but return `{"r": false}`:
|
||||
`SELECT topic =~ 't' as r FROM "$events/client_connected"`.
|
||||
The topic is a null value as there's no such field in event `$events/client_connected`, so it
|
||||
should return false if match it to a topic.
|
||||
|
||||
## v4.3.19
|
||||
|
||||
|
@ -22,7 +30,6 @@ File format:
|
|||
|
||||
- Improve error message for LwM2M plugin when object ID is not valid. [#8654](https://github.com/emqx/emqx/pull/8654).
|
||||
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
|
||||
- Add node evacuation and cluster rebalancing features. [#8597](https://github.com/emqx/emqx/pull/8597)
|
||||
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
|
||||
- Increases the latency interval for MQTT Bridge test connections to improve compatibility in high-latency environments. [#8745](https://github.com/emqx/emqx/pull/8745)
|
||||
- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725)
|
||||
|
|
|
@ -86,7 +86,7 @@ is_expired(Exp) when is_binary(Exp) ->
|
|||
?DEBUG("acl_deny_due_to_invalid_jwt_exp:~p", [Exp]),
|
||||
true
|
||||
end;
|
||||
is_expired(Exp) when is_integer(Exp) ->
|
||||
is_expired(Exp) when is_number(Exp) ->
|
||||
Now = erlang:system_time(second),
|
||||
Now > Exp;
|
||||
is_expired(Exp) ->
|
||||
|
|
|
@ -201,19 +201,19 @@ do_verify(JwsCompacted, [Jwk|More]) ->
|
|||
|
||||
check_claims(Claims) ->
|
||||
Now = os:system_time(seconds),
|
||||
Checker = [{<<"exp">>, with_int_value(
|
||||
Checker = [{<<"exp">>, with_num_value(
|
||||
fun(ExpireTime) -> Now < ExpireTime end)},
|
||||
{<<"iat">>, with_int_value(
|
||||
{<<"iat">>, with_num_value(
|
||||
fun(IssueAt) -> IssueAt =< Now end)},
|
||||
{<<"nbf">>, with_int_value(
|
||||
{<<"nbf">>, with_num_value(
|
||||
fun(NotBefore) -> NotBefore =< Now end)}
|
||||
],
|
||||
do_check_claim(Checker, Claims).
|
||||
|
||||
with_int_value(Fun) ->
|
||||
with_num_value(Fun) ->
|
||||
fun(Value) ->
|
||||
case Value of
|
||||
Int when is_integer(Int) -> Fun(Int);
|
||||
Num when is_number(Num) -> Fun(Num);
|
||||
Bin when is_binary(Bin) ->
|
||||
case emqx_auth_jwt:string_to_number(Bin) of
|
||||
{ok, Num} -> Fun(Num);
|
||||
|
|
|
@ -177,6 +177,30 @@ t_check_auth_str_exp(_Config) ->
|
|||
ct:pal("Auth result: ~p~n", [Result2]),
|
||||
?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result2).
|
||||
|
||||
t_check_auth_float_exp(init, _Config) ->
|
||||
application:unset_env(emqx_auth_jwt, verify_claims).
|
||||
t_check_auth_float_exp(_Config) ->
|
||||
Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
|
||||
Exp = os:system_time(seconds) + 3.5,
|
||||
|
||||
Jwt0 = sign([{clientid, <<"client1">>},
|
||||
{username, <<"plain">>},
|
||||
{exp, Exp}], <<"HS256">>, <<"emqxsecret">>),
|
||||
ct:pal("Jwt: ~p~n", [Jwt0]),
|
||||
|
||||
Result0 = emqx_access_control:authenticate(Plain#{password => Jwt0}),
|
||||
ct:pal("Auth result: ~p~n", [Result0]),
|
||||
?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0),
|
||||
|
||||
Jwt1 = sign([{clientid, <<"client1">>},
|
||||
{username, <<"plain">>},
|
||||
{exp, 1.5}], <<"HS256">>, <<"emqxsecret">>),
|
||||
ct:pal("Jwt: ~p~n", [Jwt1]),
|
||||
|
||||
Result1 = emqx_access_control:authenticate(Plain#{password => Jwt1}),
|
||||
ct:pal("Auth result: ~p~n", [Result1]),
|
||||
?assertMatch({error, _}, Result1).
|
||||
|
||||
t_check_claims(init, _Config) ->
|
||||
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
|
||||
t_check_claims(_Config) ->
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.7",
|
||||
[{"4.3.8",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[5-6]">>,
|
||||
|
@ -33,7 +34,8 @@
|
|||
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.7",
|
||||
[{"4.3.8",[{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.7",
|
||||
[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia_cli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[5-6]">>,
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.9",
|
||||
[{"4.3.10",[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[2-8]">>,
|
||||
|
@ -14,7 +15,8 @@
|
|||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.9",
|
||||
[{"4.3.10",[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[2-8]">>,
|
||||
|
|
|
@ -66,7 +66,7 @@
|
|||
|
||||
-opaque(channel() :: #channel{}).
|
||||
|
||||
-type(conn_state() :: idle | connecting | connected | disconnected).
|
||||
-type(conn_state() :: idle | connecting | connected | disconnected | accepted).
|
||||
|
||||
-type(reply() :: {outgoing, binary()}
|
||||
| {outgoing, [binary()]}
|
||||
|
@ -159,7 +159,7 @@ init(ConnInfo = #{socktype := Socktype,
|
|||
Channel = #channel{gcli = #{channel => GRpcChann},
|
||||
conninfo = NConnInfo1,
|
||||
clientinfo = ClientInfo,
|
||||
conn_state = idle,
|
||||
conn_state = accepted,
|
||||
timers = #{}
|
||||
},
|
||||
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
{VSN,
|
||||
[{"4.4.8",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
@ -86,6 +87,7 @@
|
|||
{<<".*">>,[]}],
|
||||
[{"4.4.8",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.4\\.[6-7]">>,
|
||||
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -242,7 +242,10 @@ do_compare('>=', L, R) ->
|
|||
do_compare('=', L, R) orelse do_compare('>', L, R);
|
||||
do_compare('<>', L, R) -> L /= R;
|
||||
do_compare('!=', L, R) -> L /= R;
|
||||
do_compare('=~', T, F) -> emqx_topic:match(T, F).
|
||||
do_compare('=~', undefined, undefined) -> true;
|
||||
do_compare('=~', T, F) when T == undefined; F == undefined -> false;
|
||||
do_compare('=~', T, F) ->
|
||||
emqx_topic:match(T, F).
|
||||
|
||||
number(Bin) ->
|
||||
try binary_to_integer(Bin)
|
||||
|
|
|
@ -2533,6 +2533,13 @@ t_sqlparse_compare_undefined(_Config) ->
|
|||
%% no match
|
||||
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00)),
|
||||
|
||||
Sql00_1 = "select "
|
||||
" * "
|
||||
"from \"t/#\" "
|
||||
"where dev <> undefined ",
|
||||
%% no match
|
||||
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00_1)),
|
||||
|
||||
Sql01 = "select "
|
||||
" 'd' as dev "
|
||||
"from \"t/#\" "
|
||||
|
@ -2541,13 +2548,29 @@ t_sqlparse_compare_undefined(_Config) ->
|
|||
%% pass
|
||||
?assertMatch(#{}, Res01),
|
||||
|
||||
Sql01_1 = "select "
|
||||
" 'd' as dev "
|
||||
"from \"t/#\" "
|
||||
"where dev <> undefined ",
|
||||
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
|
||||
%% pass
|
||||
?assertMatch(#{}, Res01_1),
|
||||
|
||||
Sql02 = "select "
|
||||
" * "
|
||||
"from \"t/#\" "
|
||||
"where dev != 'undefined' ",
|
||||
{ok, Res02} = ?TEST_SQL(Sql02),
|
||||
%% pass
|
||||
?assertMatch(#{}, Res02).
|
||||
?assertMatch(#{}, Res02),
|
||||
|
||||
Sql03 = "select "
|
||||
" * "
|
||||
"from \"t/#\" "
|
||||
"where dev =~ 'undefined' ",
|
||||
Res03 = ?TEST_SQL(Sql03),
|
||||
%% no match
|
||||
?assertMatch({error, nomatch}, Res03).
|
||||
|
||||
t_sqlparse_compare_null_null(_Config) ->
|
||||
%% test undefined == undefined
|
||||
|
@ -2566,6 +2589,14 @@ t_sqlparse_compare_null_null(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res01),
|
||||
|
||||
%% test undefined <> undefined
|
||||
Sql01_1 = "select "
|
||||
" a <> b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
|
||||
?assertMatch(#{<<"c">> := false
|
||||
}, Res01_1),
|
||||
|
||||
%% test undefined > undefined
|
||||
Sql02 = "select "
|
||||
" a > b as c "
|
||||
|
@ -2596,10 +2627,18 @@ t_sqlparse_compare_null_null(_Config) ->
|
|||
"from \"t/#\" ",
|
||||
{ok, Res05} = ?TEST_SQL(Sql05),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res05).
|
||||
}, Res05),
|
||||
|
||||
%% test undefined =~ undefined
|
||||
Sql06 = "select "
|
||||
" a =~ b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res06} = ?TEST_SQL(Sql06),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res06).
|
||||
|
||||
t_sqlparse_compare_null_notnull(_Config) ->
|
||||
%% test undefined == b
|
||||
%% test undefined == 'b'
|
||||
Sql00 = "select "
|
||||
" 'b' as b, a = b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2607,7 +2646,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res00),
|
||||
|
||||
%% test undefined != b
|
||||
%% test undefined != 'b'
|
||||
Sql01 = "select "
|
||||
" 'b' as b, a != b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2615,7 +2654,15 @@ t_sqlparse_compare_null_notnull(_Config) ->
|
|||
?assertMatch(#{<<"c">> := true
|
||||
}, Res01),
|
||||
|
||||
%% test undefined > b
|
||||
%% test undefined <> 'b'
|
||||
Sql01_1 = "select "
|
||||
" 'b' as b, a <> b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res01_1),
|
||||
|
||||
%% test undefined > 'b'
|
||||
Sql02 = "select "
|
||||
" 'b' as b, a > b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2623,7 +2670,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res02),
|
||||
|
||||
%% test undefined < b
|
||||
%% test undefined < 'b'
|
||||
Sql03 = "select "
|
||||
" 'b' as b, a < b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2631,7 +2678,7 @@ t_sqlparse_compare_null_notnull(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res03),
|
||||
|
||||
%% test undefined <= b
|
||||
%% test undefined <= 'b'
|
||||
Sql04 = "select "
|
||||
" 'b' as b, a <= b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2639,13 +2686,21 @@ t_sqlparse_compare_null_notnull(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res04),
|
||||
|
||||
%% test undefined >= b
|
||||
%% test undefined >= 'b'
|
||||
Sql05 = "select "
|
||||
" 'b' as b, a >= b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res05} = ?TEST_SQL(Sql05),
|
||||
?assertMatch(#{<<"c">> := false
|
||||
}, Res05).
|
||||
}, Res05),
|
||||
|
||||
%% test undefined =~ 'b'
|
||||
Sql06 = "select "
|
||||
" 'b' as b, a =~ b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res06} = ?TEST_SQL(Sql06),
|
||||
?assertMatch(#{<<"c">> := false
|
||||
}, Res06).
|
||||
|
||||
t_sqlparse_compare_notnull_null(_Config) ->
|
||||
%% test 'a' == undefined
|
||||
|
@ -2664,6 +2719,14 @@ t_sqlparse_compare_notnull_null(_Config) ->
|
|||
?assertMatch(#{<<"c">> := true
|
||||
}, Res01),
|
||||
|
||||
%% test 'a' <> undefined
|
||||
Sql01_1 = "select "
|
||||
" 'a' as a, a <> b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res01_1} = ?TEST_SQL(Sql01_1),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res01_1),
|
||||
|
||||
%% test 'a' > undefined
|
||||
Sql02 = "select "
|
||||
" 'a' as a, a > b as c "
|
||||
|
@ -2694,7 +2757,15 @@ t_sqlparse_compare_notnull_null(_Config) ->
|
|||
"from \"t/#\" ",
|
||||
{ok, Res05} = ?TEST_SQL(Sql05),
|
||||
?assertMatch(#{<<"c">> := false
|
||||
}, Res05).
|
||||
}, Res05),
|
||||
|
||||
%% test 'a' =~ undefined
|
||||
Sql06 = "select "
|
||||
" 'a' as a, a =~ b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res06} = ?TEST_SQL(Sql06),
|
||||
?assertMatch(#{<<"c">> := false
|
||||
}, Res06).
|
||||
|
||||
t_sqlparse_compare(_Config) ->
|
||||
Sql00 = "select "
|
||||
|
@ -2704,6 +2775,13 @@ t_sqlparse_compare(_Config) ->
|
|||
?assertMatch(#{<<"c">> := true
|
||||
}, Res00),
|
||||
|
||||
Sql00_1 = "select "
|
||||
" 'true' as a, true as b, a = b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res00_1} = ?TEST_SQL(Sql00_1),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res00_1),
|
||||
|
||||
Sql01 = "select "
|
||||
" is_null(a) as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2732,7 +2810,21 @@ t_sqlparse_compare(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res04),
|
||||
|
||||
%% test 'a' >= undefined
|
||||
Sql04_0 = "select "
|
||||
" 1 as a, 1 as b, a = b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res04_0} = ?TEST_SQL(Sql04_0),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res04_0),
|
||||
|
||||
Sql04_1 = "select "
|
||||
" 1 as a, '1' as b, a = b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res04_1} = ?TEST_SQL(Sql04_1),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res04_1),
|
||||
|
||||
%% test 1 >= 2
|
||||
Sql05 = "select "
|
||||
" 1 as a, 2 as b, a >= b as c "
|
||||
"from \"t/#\" ",
|
||||
|
@ -2740,13 +2832,37 @@ t_sqlparse_compare(_Config) ->
|
|||
?assertMatch(#{<<"c">> := false
|
||||
}, Res05),
|
||||
|
||||
%% test 'a' >= undefined
|
||||
%% test 1 <= 2
|
||||
Sql06 = "select "
|
||||
" 1 as a, 2 as b, a <= b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res06} = ?TEST_SQL(Sql06),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res06).
|
||||
}, Res06),
|
||||
|
||||
%% test 1 != 2
|
||||
Sql07 = "select "
|
||||
" 1 as a, 2 as b, a != b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res07} = ?TEST_SQL(Sql07),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res07),
|
||||
|
||||
%% test 1 <> 2
|
||||
Sql07_1 = "select "
|
||||
" 1 as a, 2 as b, a <> b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res07_1} = ?TEST_SQL(Sql07_1),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res07_1),
|
||||
|
||||
%% test 't' =~ 't'
|
||||
Sql08 = "select "
|
||||
" 't' as a, 't' as b, a =~ b as c "
|
||||
"from \"t/#\" ",
|
||||
{ok, Res08} = ?TEST_SQL(Sql08),
|
||||
?assertMatch(#{<<"c">> := true
|
||||
}, Res08).
|
||||
|
||||
t_sqlparse_new_map(_Config) ->
|
||||
%% construct a range without 'as'
|
||||
|
|
|
@ -13,8 +13,8 @@ type: application
|
|||
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
version: 4.4.8
|
||||
version: 4.4.9
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application.
|
||||
appVersion: 4.4.8
|
||||
appVersion: 4.4.9
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
]}.
|
||||
|
||||
{mapping, "dashboard.listener.http", "emqx_dashboard.listeners", [
|
||||
{datatype, integer}
|
||||
{datatype, [integer, ip]}
|
||||
]}.
|
||||
|
||||
{mapping, "dashboard.listener.http.acceptors", "emqx_dashboard.listeners", [
|
||||
|
@ -39,7 +39,7 @@
|
|||
]}.
|
||||
|
||||
{mapping, "dashboard.listener.https", "emqx_dashboard.listeners", [
|
||||
{datatype, integer}
|
||||
{datatype, [integer, ip]}
|
||||
]}.
|
||||
|
||||
{mapping, "dashboard.listener.https.acceptors", "emqx_dashboard.listeners", [
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_dashboard,
|
||||
[{description, "EMQX Web Dashboard"},
|
||||
{vsn, "4.4.7"}, % strict semver, bump manually!
|
||||
{vsn, "4.4.8"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_dashboard_sup]},
|
||||
{applications, [kernel,stdlib,mnesia,minirest]},
|
||||
|
|
|
@ -54,7 +54,8 @@ start_listener({Proto, Port, Options}) ->
|
|||
https -> minirest:start_https(Server, RanchOpts, Dispatch)
|
||||
end.
|
||||
|
||||
ranch_opts(Port, Options0) ->
|
||||
ranch_opts(Bind, Options0) ->
|
||||
IpPort = ip_port(Bind),
|
||||
NumAcceptors = get_value(num_acceptors, Options0, 4),
|
||||
MaxConnections = get_value(max_connections, Options0, 512),
|
||||
Options = lists:foldl(fun({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors ->
|
||||
|
@ -68,7 +69,13 @@ ranch_opts(Port, Options0) ->
|
|||
end, [], Options0),
|
||||
#{num_acceptors => NumAcceptors,
|
||||
max_connections => MaxConnections,
|
||||
socket_opts => [{port, Port} | Options]}.
|
||||
socket_opts => IpPort ++ Options}.
|
||||
|
||||
ip_port({IpStr, Port}) ->
|
||||
{ok, Ip} = inet:parse_address(IpStr),
|
||||
[{ip, Ip}, {port, Port}];
|
||||
ip_port(Port) when is_integer(Port) ->
|
||||
[{port, Port}].
|
||||
|
||||
stop_listeners() ->
|
||||
lists:foreach(fun(Listener) -> stop_listener(Listener) end, listeners()).
|
||||
|
|
|
@ -327,7 +327,12 @@ setup_node(Node, Apps) ->
|
|||
application:set_env(emqx_management, listeners, []),
|
||||
ok;
|
||||
(emqx_dashboard) ->
|
||||
application:set_env(emqx_dashboard, listeners, []),
|
||||
Options = [{http,{"127.0.0.1",18184},
|
||||
[{num_acceptors,4},
|
||||
{max_connections,512},
|
||||
{inet6,false},
|
||||
{ipv6_v6only,false}]}],
|
||||
application:set_env(emqx_dashboard, listeners, Options),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
|
|
|
@ -50,7 +50,7 @@
|
|||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
|
||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.9"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||
|
|
|
@ -374,12 +374,12 @@ ensure_version(Version, OldInstructions) ->
|
|||
|
||||
contains_version(Needle, Haystack) when is_list(Needle) ->
|
||||
lists:any(
|
||||
fun(<<"*">>) -> true; %% TODO: delete after we pass esockd 5.8.4
|
||||
(Regex) when is_binary(Regex) ->
|
||||
fun(Regex) when is_binary(Regex) ->
|
||||
Length = length(Needle),
|
||||
case re:run(Needle, Regex) of
|
||||
{match, _} ->
|
||||
{match, [{0, Length}]} ->
|
||||
true;
|
||||
nomatch ->
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
(Vsn) ->
|
||||
|
|
|
@ -2,10 +2,13 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.4.8",
|
||||
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.7",
|
||||
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
|
@ -14,7 +17,8 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.6",
|
||||
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
|
@ -197,10 +201,13 @@
|
|||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.8",
|
||||
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.7",
|
||||
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
|
@ -209,7 +216,8 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.6",
|
||||
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -760,8 +760,8 @@ handle_deliver(Delivers, Channel = #channel{
|
|||
%% NOTE: Order is important here. While the takeover is in
|
||||
%% progress, the session cannot enqueue messages, since it already
|
||||
%% passed on the queue to the new connection in the session state.
|
||||
NPendings = lists:append(Pendings,
|
||||
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
|
||||
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
|
||||
NPendings = lists:append(Pendings, NDelivers),
|
||||
{ok, Channel#channel{pendings = NPendings}};
|
||||
|
||||
handle_deliver(Delivers, Channel = #channel{
|
||||
|
@ -769,8 +769,8 @@ handle_deliver(Delivers, Channel = #channel{
|
|||
takeover = false,
|
||||
session = Session,
|
||||
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
|
||||
NSession = emqx_session:enqueue(ClientInfo,
|
||||
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session),
|
||||
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
|
||||
NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session),
|
||||
{ok, Channel#channel{session = NSession}};
|
||||
|
||||
handle_deliver(Delivers, Channel = #channel{
|
||||
|
@ -801,12 +801,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
|
|||
end, Delivers).
|
||||
|
||||
%% Nack delivers from shared subscription
|
||||
maybe_nack(Delivers) ->
|
||||
lists:filter(fun not_nacked/1, Delivers).
|
||||
|
||||
not_nacked({deliver, _Topic, Msg}) ->
|
||||
not (emqx_shared_sub:is_ack_required(Msg)
|
||||
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
|
||||
maybe_discard_shared_delivers(Delivers) ->
|
||||
lists:filtermap(
|
||||
fun({deliver, Topic, Msg}) ->
|
||||
case emqx_shared_sub:is_ack_required(Msg) of
|
||||
false ->
|
||||
true;
|
||||
true ->
|
||||
case emqx_shared_sub:is_retry_dispatch(Msg) of
|
||||
true ->
|
||||
%% force enqueue the retried shared deliver
|
||||
{true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}};
|
||||
false ->
|
||||
ok = emqx_shared_sub:nack_no_connection(Msg),
|
||||
false
|
||||
end
|
||||
end
|
||||
end, Delivers).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle outgoing packet
|
||||
|
|
|
@ -46,15 +46,14 @@
|
|||
, maybe_nack_dropped/1
|
||||
, nack_no_connection/1
|
||||
, is_ack_required/1
|
||||
, is_retry_dispatch/1
|
||||
, get_group/1
|
||||
]).
|
||||
|
||||
%% for testing
|
||||
-ifdef(TEST).
|
||||
-export([ subscribers/2
|
||||
, ack_enabled/0
|
||||
, strategy/1
|
||||
]).
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -239,6 +238,13 @@ get_group(Msg) ->
|
|||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||
|
||||
-spec(is_retry_dispatch(emqx_types:message()) -> boolean()).
|
||||
is_retry_dispatch(Msg) ->
|
||||
case get_group_ack(Msg) of
|
||||
{_Sender, {retry, _Group, _Ref}} -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
||||
maybe_nack_dropped(Msg) ->
|
||||
|
@ -280,10 +286,15 @@ maybe_ack(Msg) ->
|
|||
Msg;
|
||||
Ack ->
|
||||
{Sender, Ref} = fetch_sender_ref(Ack),
|
||||
Sender ! {Ref, ?ACK},
|
||||
ack(Sender, Ref),
|
||||
without_group_ack(Msg)
|
||||
end.
|
||||
|
||||
-spec(ack(pid(), reference()) -> ok).
|
||||
ack(Sender, Ref) ->
|
||||
Sender ! {Ref, ?ACK},
|
||||
ok.
|
||||
|
||||
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
|
||||
%% These clauses are for backward compatibility
|
||||
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
|
||||
|
|
|
@ -483,6 +483,45 @@ t_handle_deliver_nl(_) ->
|
|||
NMsg = emqx_message:set_flag(nl, Msg),
|
||||
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
|
||||
|
||||
t_handle_deliver_shared_in_no_connection(_) ->
|
||||
Grp = <<"g">>,
|
||||
Sender = self(),
|
||||
Ref1 = make_ref(),
|
||||
Ref2 = make_ref(),
|
||||
Chann = emqx_channel:set_field(conn_state, disconnected, channel()),
|
||||
|
||||
Msg0 = emqx_shared_sub:with_group_ack(
|
||||
emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>),
|
||||
Grp,
|
||||
fresh,
|
||||
Sender,
|
||||
Ref1
|
||||
),
|
||||
Msg1 = emqx_shared_sub:with_group_ack(
|
||||
emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>),
|
||||
Grp,
|
||||
retry,
|
||||
Sender,
|
||||
Ref2
|
||||
),
|
||||
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
||||
|
||||
%% all shared msgs should be queued if shared_dispatch_ack_enabled=false
|
||||
meck:new(emqx_shared_sub, [passthrough, no_history]),
|
||||
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end),
|
||||
{ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann),
|
||||
?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))),
|
||||
meck:unload(emqx_shared_sub),
|
||||
|
||||
%% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true
|
||||
meck:new(emqx_shared_sub, [passthrough, no_history]),
|
||||
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end),
|
||||
{ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann),
|
||||
?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))),
|
||||
receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end,
|
||||
receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end,
|
||||
meck:unload(emqx_shared_sub).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases for handle_out
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue