Merge remote-tracking branch 'origin/main-v4.3' into sync-v44-a

This commit is contained in:
Thales Macedo Garitezi 2022-10-24 16:28:11 -03:00
commit 0a8f3d4e16
45 changed files with 2380 additions and 1506 deletions

View File

@ -1,6 +1,6 @@
[
{
"name": "mongo",
"name": "mongo_single",
"listen": "0.0.0.0:27017",
"upstream": "mongo:27017",
"enabled": true

43
.github/CODEOWNERS vendored Normal file
View File

@ -0,0 +1,43 @@
## MQTT & Core
/src/ @qzhuyan
/include/ @qzhuyan
/etc/ @qzhuyan
/test/ @qzhuyan
## CI
/.github/ @id
/.ci/ @id
/scripts/ @id
/build @id
/deploy/ @id
## Authenticatio & ACL
/apps/emqx_auth_*/ @savonarola
/apps/emqx_psk_file/ @savonarola
/apps/emqx_retainer/ @savonarola
/apps/emqx_sasl/ @savonarola
## Gateway
/apps/emqx_coap/ @HJianBo
/apps/emqx_exhook/ @HJianBo
/apps/emqx_exproto/ @HJianBo
/apps/emqx_lua_hook/ @HJianBo
/apps/emqx_lwm2m/ @HJianBo
## OPs
/apps/emqx_management/ @zhongwencool
/apps/emqx_recon/ @zhongwencool
/apps/emqx_plugin_libs/ @zhongwencool
/apps/emqx_prometheus/ @zhongwencool
/apps/emqx_recon/ @zhongwencool
## Data integration
/apps/emqx_rule_engine/ @thalesmg
/apps/emqx_web_hook/ @thalesmg
## External Plugins
/lib-extra/ @zmstone
## Default
* @zmstone

View File

@ -35,6 +35,8 @@ runs:
with:
path: ~/.kerl/${{ inputs.otp }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
restore-keys: |
otp-install-${{ inputs.otp }}-${{ inputs.os }}
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
@ -93,4 +95,5 @@ runs:
exit 1
fi
cd ..
# test with a spaces in path
rm -rf "emqx home"

View File

@ -31,8 +31,7 @@ jobs:
path: source
fetch-depth: 0
- id: detect-profiles
working-directory: source
uses: ./.github/actions/detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- name: get_all_deps

View File

@ -18,8 +18,7 @@ jobs:
path: source
fetch-depth: 0
- id: detect-profiles
working-directory: source
uses: ./.github/actions/detect-profiles
uses: ./source/.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}

View File

@ -10,12 +10,15 @@ File format:
- One list item per change topic
Change log ends with a list of GitHub PRs
## v4.3.22
### Minor changes
## For 4.3.22 and later versions, please find details in `changes` dir
## v4.3.21
### Bug fixes
- Deny POST an existing resource id using HTTP API with error 400 "Already Exists". [#9079](https://github.com/emqx/emqx/pull/9079)
- Fix the issue that reseting rule metrics crashed under certain conditions. [#9079](https://github.com/emqx/emqx/pull/9079)
### Enhancements
- TLS listener memory usage optimization [#9005](https://github.com/emqx/emqx/pull/9005).
@ -49,7 +52,7 @@ File format:
- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client:
More changes in redis client included in this release:
- Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where).

View File

@ -23,6 +23,7 @@
-logger_header("[JWT]").
-export([ check_auth/3
, check/3
, check_acl/5
, description/0
]).
@ -33,6 +34,10 @@
%% Authentication callbacks
%%--------------------------------------------------------------------
%% for compatibility with old versions
check(ClientInfo, AuthResult, State) ->
?MODULE:check_auth(ClientInfo, AuthResult, State).
check_auth(ClientInfo, AuthResult, #{from := From, checklists := Checklists}) ->
case maps:find(From, ClientInfo) of
error ->

View File

@ -472,3 +472,19 @@ t_check_jwt_acl_no_exp(_Config) ->
emqtt:subscribe(C, <<"a/b">>, 0)),
ok = emqtt:disconnect(C).
t_check_compatibility(init, _Config) -> ok.
t_check_compatibility(_Config) ->
%% We literary want emqx_auth_jwt:check call emqx_auth_jwt:check_auth, so check with meck
ok = meck:new(emqx_auth_jwt, [passthrough, no_history]),
ok = meck:expect(emqx_auth_jwt, check_auth, fun(a, b, c) -> ok end),
?assertEqual(
ok,
emqx_auth_jwt:check(a, b, c)
),
meck:validate(emqx_auth_jwt),
meck:unload(emqx_auth_jwt).

View File

@ -686,7 +686,7 @@ heal_failure(FailureType, ProxyHost, ProxyPort) ->
end.
switch_proxy(Switch, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo",
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo_single",
Body = case Switch of
off -> <<"{\"enabled\":false}">>;
on -> <<"{\"enabled\":true}">>
@ -695,27 +695,27 @@ switch_proxy(Switch, ProxyHost, ProxyPort) ->
[{body_format, binary}]).
timeout_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo_single/toxics",
Body = <<"{\"name\":\"timeout\",\"type\":\"timeout\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"timeout\":0}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
timeout_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/timeout",
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo_single/toxics/timeout",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
latency_up_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo_single/toxics",
Body = <<"{\"name\":\"latency_up\",\"type\":\"latency\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
latency_up_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/latency_up",
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo_single/toxics/latency_up",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).

View File

@ -70,6 +70,9 @@
all() ->
emqx_ct:all(?MODULE).
suite() ->
[{timetrap, {seconds, 120}}].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_auth_pgsql]),
drop_acl(),

View File

@ -47,7 +47,7 @@
用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中:
```protobuff
```protobuf
syntax = "proto3";
package emqx.exhook.v1;

View File

@ -44,7 +44,7 @@
详情参见:`priv/protos/exproto.proto`,例如接口的定义有:
```protobuff
```protobuf
syntax = "proto3";
package emqx.exproto.v1;

View File

@ -1,6 +1,6 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.12"}, %% 4.3.3 is used by ee
{vsn, "4.3.13"}, %% 4.3.3 is used by ee
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.11",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
[{"4.3.12",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
{"4.3.11",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
@ -19,7 +20,8 @@
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.11",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
[{"4.3.12",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
{"4.3.11",[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},

View File

@ -643,7 +643,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
?LOG(notice, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,

View File

@ -115,7 +115,11 @@ handle_request(<<"GET">>, <<"/status">>, Req) ->
end,
Status = io_lib:format("Node ~s is ~s~nemqx is ~s",
[node(), InternalStatus, AppStatus]),
cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, Status, Req);
StatusCode = case AppStatus of
running -> 200;
not_running -> 503
end,
cowboy_req:reply(StatusCode, #{<<"content-type">> => <<"text/plain">>}, Status, Req);
handle_request(_Method, _Path, Req) ->
cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req).

View File

@ -25,6 +25,8 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-define(HOST, "http://127.0.0.1:8081/").
-import(emqx_mgmt_api_test_helpers,
[request_api/3,
request_api/4,
@ -44,9 +46,31 @@ end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_management]),
Config.
init_per_testcase(t_status_ok, Config) ->
ok = emqx_rule_registry:mnesia(boot),
ok = emqx_dashboard_admin:mnesia(boot),
application:ensure_all_started(emqx_rule_engine),
application:ensure_all_started(emqx_dashboard),
Config;
init_per_testcase(t_status_not_ok, Config) ->
ok = emqx_rule_registry:mnesia(boot),
ok = emqx_dashboard_admin:mnesia(boot),
application:ensure_all_started(emqx_rule_engine),
application:ensure_all_started(emqx_dashboard),
application:stop(emqx),
Config;
init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_status_ok, _Config) ->
application:stop(emqx_rule_engine),
application:stop(emqx_dashboard),
ok;
end_per_testcase(t_status_not_ok, _Config) ->
application:stop(emqx_rule_engine),
application:stop(emqx_dashboard),
application:ensure_all_started(emqx),
ok;
end_per_testcase(_, Config) ->
Config.
@ -787,6 +811,51 @@ t_keepalive(_Config) ->
application:stop(emqx_dashboard),
ok.
t_status_ok(_Config) ->
{ok, #{ body := Resp
, status_code := StatusCode
}} = do_request(#{method => get, path => ["status"], headers => [],
body => no_body}),
?assertMatch(
{match, _},
re:run(Resp, <<"emqx is running$">>)),
?assertEqual(200, StatusCode),
ok.
t_status_not_ok(_Config) ->
{ok, #{ body := Resp
, status_code := StatusCode
}} = do_request(#{method => get, path => ["status"], headers => [],
body => no_body}),
?assertMatch(
{match, _},
re:run(Resp, <<"emqx is not_running$">>)),
?assertEqual(503, StatusCode),
ok.
do_request(Opts) ->
#{ path := Path
, method := Method
, headers := Headers
, body := Body0
} = Opts,
URL = ?HOST ++ filename:join(Path),
Request = case Body0 of
no_body -> {URL, Headers};
{Encoding, Body} -> {URL, Headers, Encoding, Body}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{_, StatusCode, _}, Headers1, Body1}} ->
Body2 = case emqx_json:safe_decode(Body1, [return_maps]) of
{ok, Json} -> Json;
{error, _} -> Body1
end,
{ok, #{status_code => StatusCode, headers => Headers1, body => Body2}}
end.
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value

View File

@ -36,13 +36,21 @@ request_api(Method, Url, QueryParams, Auth, []) ->
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
Headers = case Auth of
no_auth -> [];
Header -> [Header]
end,
do_request_api(Method, {NewUrl, Headers});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
Headers = case Auth of
no_auth -> [];
Header -> [Header]
end,
do_request_api(Method, {NewUrl, Headers, "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),

View File

@ -131,7 +131,7 @@
-record(resource_params,
{ id :: resource_id()
, params :: #{} %% the params got after initializing the resource
, params :: map() %% the params got after initializing the resource
, status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()}
}).

View File

@ -1,5 +1,7 @@
%% -*- mode: erlang -*-
{deps, []}.
%% Comple Opts
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
@ -18,6 +20,14 @@
warnings_as_errors, deprecated_functions
]}.
%% {erl_opts, [...]}, but for CT runs
%% NOT WORKING!!!
%% %% == Common Test ==
%% {ct_compile_opts, [ export_all
%% , nowarn_export_all
%% ]}.
%% {ct_opts, []}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.

View File

@ -306,7 +306,7 @@ show_action(#{name := Name}, _Params) ->
create_resource(#{}, Params) ->
case parse_resource_params(Params) of
{ok, ParsedParams} ->
if_test(fun() -> do_create_resource(test_resource, ParsedParams) end,
if_test(fun() -> do_create_resource(test_resource, maps:without([id], ParsedParams)) end,
fun() -> do_create_resource(create_resource, ParsedParams) end,
Params);
{error, Reason} ->
@ -315,6 +315,16 @@ create_resource(#{}, Params) ->
end.
do_create_resource(Create, ParsedParams) ->
case maps:find(id, ParsedParams) of
{ok, ResId} ->
case emqx_rule_registry:find_resource(ResId) of
{ok, _} -> return({error, 400, <<"Already Exists">>});
not_found -> do_create_resource2(Create, ParsedParams)
end;
error -> do_create_resource2(Create, ParsedParams)
end.
do_create_resource2(Create, ParsedParams) ->
case emqx_rule_engine:Create(ParsedParams) of
ok ->
return(ok);

View File

@ -335,28 +335,46 @@ null() ->
%% Arithmetic Funcs
%%------------------------------------------------------------------------------
-define(OPERATOR_TYPE_ERROR, unsupported_function_operator_type).
%% plus 2 numbers
'+'(X, Y) when is_number(X), is_number(Y) ->
X + Y;
%% concat 2 strings
'+'(X, Y) when is_binary(X), is_binary(Y) ->
concat(X, Y).
concat(X, Y);
%% unsupported type implicit conversion
'+'(X, Y)
when (is_number(X) andalso is_binary(Y)) orelse
(is_binary(X) andalso is_number(Y)) ->
error(unsupported_type_implicit_conversion);
'+'(_, _) ->
error(?OPERATOR_TYPE_ERROR).
'-'(X, Y) when is_number(X), is_number(Y) ->
X - Y.
X - Y;
'-'(_, _) ->
error(?OPERATOR_TYPE_ERROR).
'*'(X, Y) when is_number(X), is_number(Y) ->
X * Y.
X * Y;
'*'(_, _) ->
error(?OPERATOR_TYPE_ERROR).
'/'(X, Y) when is_number(X), is_number(Y) ->
X / Y.
X / Y;
'/'(_, _) ->
error(?OPERATOR_TYPE_ERROR).
'div'(X, Y) when is_integer(X), is_integer(Y) ->
X div Y.
X div Y;
'div'(_, _) ->
error(?OPERATOR_TYPE_ERROR).
mod(X, Y) when is_integer(X), is_integer(Y) ->
X rem Y.
X rem Y;
mod(_, _) ->
error(?OPERATOR_TYPE_ERROR).
eq(X, Y) ->
X == Y.

View File

@ -377,7 +377,7 @@ handle_info(_Info, State) ->
{noreply, State}.
code_change({down, _Vsn}, State = #state{metric_ids = MIDs}, [Vsn]) ->
case string:tokens(Vsn, ".") of
case string:tokens(Vsn, ".") of
["4", "4", SVal] ->
{Val, []} = string:to_integer(SVal),
case Val == 0 of
@ -504,11 +504,10 @@ calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal,
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
format_rule_speed(#rule_speed{max = Max, current = Current, last5m = Last5Min}) ->
#{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
#{max => round2(Max), current => round2(Current), last5m => round2(Last5Min)}.
precision(Float, N) ->
Base = math:pow(10, N),
round(Float * Base) / Base.
round2(Float) ->
round(Float * 100) / 100.
%%------------------------------------------------------------------------------
%% Metrics Definitions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Test Suite funcs
-import(emqx_rule_test_lib,
[ stop_apps/0
, start_apps/0
]).
%% RULE helper funcs
-import(emqx_rule_test_lib,
[ create_simple_repub_rule/2
, create_simple_repub_rule/3
, make_simple_debug_resource_type/0
, init_events_counters/0
]).

View File

@ -0,0 +1,141 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_test_lib).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%%------------------------------------------------------------------------------
%% Start Apps
%%------------------------------------------------------------------------------
stop_apps() ->
stopped = mnesia:stop(),
[application:stop(App) || App <- [emqx_rule_engine, emqx]].
start_apps() ->
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, deps_path(emqx, "priv/emqx.schema"),
deps_path(emqx, "etc/emqx.conf")},
{emqx_rule_engine, local_path("priv/emqx_rule_engine.schema"),
local_path("etc/emqx_rule_engine.conf")}]].
%%--------------------------------------
%% start apps helper funcs
start_apps(App, SchemaFile, ConfigFile) ->
read_schema_configs(App, SchemaFile, ConfigFile),
set_special_configs(App),
{ok, _} = application:ensure_all_started(App).
read_schema_configs(App, SchemaFile, ConfigFile) ->
ct:pal("Read configs - SchemaFile: ~p, ConfigFile: ~p", [SchemaFile, ConfigFile]),
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = conf_parse:file(ConfigFile),
NewConfig = cuttlefish_generator:map(Schema, Conf),
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
local_path(RelativePath) ->
deps_path(emqx_rule_engine, RelativePath).
set_special_configs(emqx_rule_engine) ->
application:set_env(emqx_rule_engine, ignore_sys_message, true),
application:set_env(emqx_rule_engine, events,
[{'client.connected',on,1},
{'client.disconnected',on,1},
{'session.subscribed',on,1},
{'session.unsubscribed',on,1},
{'message.acked',on,1},
{'message.dropped',on,1},
{'message.delivered',on,1}
]),
ok;
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------
%% rule test helper funcs
%%------------------------------------------------------------------------------
create_simple_repub_rule(TargetTopic, SQL) ->
create_simple_repub_rule(TargetTopic, SQL, <<"${payload}">>).
create_simple_repub_rule(TargetTopic, SQL, Template) ->
{ok, Rule} = emqx_rule_engine:create_rule(
#{rawsql => SQL,
actions => [#{name => 'republish',
args => #{<<"target_topic">> => TargetTopic,
<<"target_qos">> => -1,
<<"payload_tmpl">> => Template}
}],
description => <<"simple repub rule">>}),
Rule.
make_simple_debug_resource_type() ->
#resource_type{
name = built_in,
provider = ?APP,
params_spec = #{},
on_create = {?MODULE, on_resource_create},
on_destroy = {?MODULE, on_resource_destroy},
on_status = {?MODULE, on_get_resource_status},
title = #{en => <<"Built-In Resource Type (debug)">>},
description = #{en => <<"The built in resource type for debug purpose">>}}.
make_simple_resource_type(ResTypeName) ->
#resource_type{
name = ResTypeName,
provider = ?APP,
params_spec = #{},
on_create = {?MODULE, on_simple_resource_type_create},
on_destroy = {?MODULE, on_simple_resource_type_destroy},
on_status = {?MODULE, on_simple_resource_type_status},
title = #{en => <<"Simple Resource Type">>},
description = #{en => <<"Simple Resource Type">>}}.
init_events_counters() ->
ets:new(events_record_tab, [named_table, bag, public]).
%%------------------------------------------------------------------------------
%% Internal helper funcs
%%------------------------------------------------------------------------------
on_resource_create(_id, _) -> #{}.
on_resource_destroy(_id, _) -> ok.
on_get_resource_status(_id, _) -> #{is_alive => true}.
on_simple_resource_type_create(_Id, #{}) -> #{}.
on_simple_resource_type_destroy(_Id, #{}) -> ok.
on_simple_resource_type_status(_Id, #{}, #{}) -> #{is_alive => true}.

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.7"}, % strict semver, bump manually!
{vsn, "4.3.8"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},

View File

@ -1,6 +1,9 @@
%% -*- mode: erlang -*-
{VSN,
[
{"4.3.7",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.6",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
@ -29,6 +32,9 @@
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
],
[
{"4.3.7",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.6",[
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},

View File

@ -208,7 +208,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) ->
idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"),
{keep_state, State#state.idle_timeout};
{keep_state_and_data, State#state.idle_timeout};
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
topic_id_type = TopicIdType
@ -226,7 +226,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
ok
end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]),
{keep_state, State#state.idle_timeout};
{keep_state_and_data, State#state.idle_timeout};
idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
handle_ping(PingReq, State);

View File

@ -21,6 +21,7 @@
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(SHOW(X), ??X).
-import(emqx_sn_frame, [ parse/1
, serialize/1
@ -67,6 +68,14 @@ t_willtopic(_) ->
Wt = #mqtt_sn_message{type = ?SN_WILLTOPIC, variable = {Flags, <<"WillTopic">>}},
?assertEqual({ok, Wt}, parse(serialize(Wt))).
t_undefined_willtopic(_) ->
Wt = #mqtt_sn_message{type = ?SN_WILLTOPIC},
?assertEqual({ok, Wt}, parse(serialize(Wt))).
t_willtopic_resp(_) ->
Wt = #mqtt_sn_message{type = ?SN_WILLTOPICRESP, variable = 0},
?assertEqual({ok, Wt}, parse(serialize(Wt))).
t_willmsgreq(_) ->
WmReq = #mqtt_sn_message{type = ?SN_WILLMSGREQ},
?assertEqual({ok, WmReq}, parse(serialize(WmReq))).
@ -88,6 +97,12 @@ t_publish(_) ->
PubMsg = #mqtt_sn_message{type = ?SN_PUBLISH, variable = {Flags, 1, 2, <<"Payload">>}},
?assertEqual({ok, PubMsg}, parse(serialize(PubMsg))).
t_publish_long_msg(_) ->
Flags = #mqtt_sn_flags{dup = false, qos = 1, retain = false, topic_id_type = 2#01},
Payload = generate_random_binary(256 + rand:uniform(256)),
PubMsg = #mqtt_sn_message{type = ?SN_PUBLISH, variable = {Flags, 1, 2, Payload}},
?assertEqual({ok, PubMsg}, parse(serialize(PubMsg))).
t_puback(_) ->
PubAck = #mqtt_sn_message{type = ?SN_PUBACK, variable = {1, 2, 0}},
?assertEqual({ok, PubAck}, parse(serialize(PubAck))).
@ -105,9 +120,21 @@ t_pubcomp(_) ->
?assertEqual({ok, PubComp}, parse(serialize(PubComp))).
t_subscribe(_) ->
Flags = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = 16#01},
Flags = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_PREDEFINED_TOPIC},
SubMsg = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags, 16#4321, 16}},
?assertEqual({ok, SubMsg}, parse(serialize(SubMsg))).
?assertEqual({ok, SubMsg}, parse(serialize(SubMsg))),
Flags1 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_NORMAL_TOPIC},
SubMsg1 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags1, 16#4321, <<"t/+">>}},
?assertEqual({ok, SubMsg1}, parse(serialize(SubMsg1))),
Flags2 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_SHORT_TOPIC},
SubMsg2 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags2, 16#4321, <<"t/+">>}},
?assertEqual({ok, SubMsg2}, parse(serialize(SubMsg2))),
Flags3 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_RESERVED_TOPIC},
SubMsg3 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags3, 16#4321, <<"t/+">>}},
?assertEqual({ok, SubMsg3}, parse(serialize(SubMsg3))).
t_suback(_) ->
Flags = #mqtt_sn_flags{qos = 1},
@ -137,6 +164,10 @@ t_disconnect(_) ->
Disconn = #mqtt_sn_message{type = ?SN_DISCONNECT},
?assertEqual({ok, Disconn}, parse(serialize(Disconn))).
t_disconnect_duration(_) ->
Disconn = #mqtt_sn_message{type = ?SN_DISCONNECT, variable = 120},
?assertEqual({ok, Disconn}, parse(serialize(Disconn))).
t_willtopicupd(_) ->
Flags = #mqtt_sn_flags{qos = 1, retain = true},
WtUpd = #mqtt_sn_message{type = ?SN_WILLTOPICUPD, variable = {Flags, <<"Topic">>}},
@ -150,6 +181,43 @@ t_willmsgresp(_) ->
UpdResp = #mqtt_sn_message{type = ?SN_WILLMSGRESP, variable = 0},
?assertEqual({ok, UpdResp}, parse(serialize(UpdResp))).
t_invalid_inpacket(_) ->
Bin = <<2:8/big-integer, 16#F0:8/big-integer>>,
?assertMatch({'EXIT', {unkown_message_type, _Stack}}, catch parse(Bin)).
t_message_type(_) ->
TypeNames = [ {?SN_ADVERTISE, ?SHOW(SN_ADVERTISE)}
, {?SN_SEARCHGW, ?SHOW(SN_SEARCHGW)}
, {?SN_GWINFO, ?SHOW(SN_GWINFO)}
, {?SN_CONNECT, ?SHOW(SN_CONNECT)}
, {?SN_CONNACK, ?SHOW(SN_CONNACK)}
, {?SN_WILLTOPICREQ, ?SHOW(SN_WILLTOPICREQ)}
, {?SN_WILLTOPIC, ?SHOW(SN_WILLTOPIC)}
, {?SN_WILLMSGREQ, ?SHOW(SN_WILLMSGREQ)}
, {?SN_WILLMSG, ?SHOW(SN_WILLMSG)}
, {?SN_REGISTER, ?SHOW(SN_REGISTER)}
, {?SN_REGACK, ?SHOW(SN_REGACK)}
, {?SN_PUBLISH, ?SHOW(SN_PUBLISH)}
, {?SN_PUBACK, ?SHOW(SN_PUBACK)}
, {?SN_PUBCOMP, ?SHOW(SN_PUBCOMP)}
, {?SN_PUBREC, ?SHOW(SN_PUBREC)}
, {?SN_PUBREL, ?SHOW(SN_PUBREL)}
, {?SN_SUBSCRIBE, ?SHOW(SN_SUBSCRIBE)}
, {?SN_SUBACK, ?SHOW(SN_SUBACK)}
, {?SN_UNSUBSCRIBE, ?SHOW(SN_UNSUBSCRIBE)}
, {?SN_UNSUBACK, ?SHOW(SN_UNSUBACK)}
, {?SN_PINGREQ, ?SHOW(SN_PINGREQ)}
, {?SN_PINGRESP, ?SHOW(SN_PINGRESP)}
, {?SN_DISCONNECT, ?SHOW(SN_DISCONNECT)}
, {?SN_WILLTOPICUPD, ?SHOW(SN_WILLTOPICUPD)}
, {?SN_WILLTOPICRESP, ?SHOW(SN_WILLTOPICRESP)}
, {?SN_WILLMSGUPD, ?SHOW(SN_WILLMSGUPD)}
, {?SN_WILLMSGRESP, ?SHOW(SN_WILLMSGRESP)}
],
{Types, Names} = lists:unzip(TypeNames),
?assertEqual(Names, [emqx_sn_frame:message_type(Type) || Type <- Types]),
ok.
t_random_test(_) ->
random_test_body(),
random_test_body(),
@ -171,6 +239,9 @@ random_test_body() ->
generate_random_binary() ->
% The min packet length is 2
Len = rand:uniform(299) + 1,
generate_random_binary(Len).
generate_random_binary(Len) ->
gen_next(Len, <<>>).
gen_next(0, Acc) ->
@ -178,4 +249,3 @@ gen_next(0, Acc) ->
gen_next(N, Acc) ->
Byte = rand:uniform(256) - 1,
gen_next(N-1, <<Acc/binary, Byte:8>>).

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_misc_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_sn]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_sn]).
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_sn_app(_) ->
?assertMatch({'EXIT', {_, _}}, catch emqx_sn_app:start_listeners()),
?assertMatch({error, _}, emqx_sn_app:stop_listener({udp, 9999, []})),
?assertMatch({error, _}, emqx_sn_app:stop_listener({udp, {{0,0,0,0}, 9999}, []})),
ok.
t_sn_broadcast(_) ->
?assertEqual(ignored, gen_server:call(emqx_sn_broadcast, ignored)),
?assertEqual(ok, gen_server:cast(emqx_sn_broadcast, ignored)),
?assertEqual(ignored, erlang:send(emqx_sn_broadcast, ignored)),
?assertEqual(broadcast_advertise, erlang:send(emqx_sn_broadcast, broadcast_advertise)),
?assertEqual(ok, emqx_sn_broadcast:stop()).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------

View File

@ -90,6 +90,15 @@ restart_emqx_sn(#{subs_resume := Bool}) ->
_ = application:ensure_all_started(emqx_sn),
ok.
recoverable_restart_emqx_sn(Setup) ->
AppEnvs = application:get_all_env(emqx_sn),
Setup(),
_ = application:stop(emqx_sn),
_ = application:ensure_all_started(emqx_sn),
fun() ->
application:set_env([{emqx_sn, AppEnvs}])
end.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
@ -105,6 +114,17 @@ t_connect(_) ->
send_connect_msg(Socket, <<"client_id_test1">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
%% unexpected advertise
Adv = ?SN_ADVERTISE_MSG(1, 100),
AdvPacket = emqx_sn_frame:serialize(Adv),
send_packet(Socket, AdvPacket),
timer:sleep(200),
%% unexpected connect
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
timer:sleep(200),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -426,6 +446,38 @@ t_subscribe_case08(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_subscribe_case09(_) ->
Dup = 0,
QoS = 0,
Retain = 0,
CleanSession = 0,
ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
TopicName1 = <<"t/+">>,
MsgId1 = 25,
TopicId0 = 0,
WillBit = 0,
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)),
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
ok = emqtt:publish(C, <<"t/1">>, <<"Hello">>, 0),
timer:sleep(100),
ok = emqtt:disconnect(C),
timer:sleep(50),
?assertError(_, receive_publish(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_negqos_case09(_) ->
Dup = 0,
QoS = 0,
@ -442,7 +494,6 @@ t_publish_negqos_case09(_) ->
Topic = <<"abc">>,
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
@ -466,6 +517,16 @@ t_publish_negqos_case09(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_negqos_case10(_) ->
QoS = ?QOS_NEG1,
MsgId = 1,
TopicId1 = ?PREDEF_TOPIC_ID1,
{ok, Socket} = gen_udp:open(0, [binary]),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId1, Payload1),
timer:sleep(100),
gen_udp:close(Socket).
t_publish_qos0_case01(_) ->
Dup = 0,
QoS = 0,
@ -1199,6 +1260,60 @@ t_will_case06(_) ->
gen_udp:close(Socket).
t_will_case07(_) ->
QoS = 1,
Duration = 1,
WillMsg = <<10, 11, 12, 13, 14>>,
WillTopic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
ok = emqx_broker:subscribe(WillTopic),
send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
%% unexpected advertise
Adv = ?SN_ADVERTISE_MSG(1, 100),
AdvPacket = emqx_sn_frame:serialize(Adv),
send_packet(Socket, AdvPacket),
timer:sleep(200),
%% unexpected connect
send_connect_msg(Socket, ClientId),
timer:sleep(200),
send_willtopic_msg(Socket, WillTopic, QoS),
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
%% unexpected advertise
send_packet(Socket, AdvPacket),
timer:sleep(200),
%% unexpected connect
send_connect_msg(Socket, ClientId),
timer:sleep(200),
send_willmsg_msg(Socket, WillMsg),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_pingreq_msg(Socket, undefined),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
% wait udp client keepalive timeout
timer:sleep(2000),
receive
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
Msg -> ct:print("recevived --- unex: ~p", [Msg])
after
1000 -> ct:fail(wait_willmsg_timeout)
end,
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
gen_udp:close(Socket).
t_asleep_test01_timeout(_) ->
QoS = 1,
Duration = 1,
@ -1392,6 +1507,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
?assertError(_, receive_publish(Socket)),
send_regack_msg(Socket, TopicIdNew, MsgId3),
send_regack_msg(Socket, TopicIdNew, MsgId3, ?SN_RC_INVALID_TOPIC_ID),
UdpData2 = receive_response(Socket),
MsgId_udp2 = check_publish_msg_on_udp(
@ -1754,6 +1870,33 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
gen_udp:close(Socket).
t_asleep_unexpected(_) ->
SleepDuration = 3,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
timer:sleep(200),
% goto asleep state
send_disconnect_msg(Socket, SleepDuration),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
timer:sleep(100),
send_pingreq_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
send_puback_msg(Socket, 5, 5, ?SN_RC_INVALID_TOPIC_ID),
send_pubrec_msg(Socket, 5),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
timer:sleep(100),
gen_udp:close(Socket).
t_awake_test01_to_connected(_) ->
QoS = 1,
Keepalive_Duration = 3,
@ -2127,6 +2270,7 @@ t_register_enqueue_delivering_messages(_) ->
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
%% receive the queued messages
@ -2150,6 +2294,88 @@ t_register_enqueue_delivering_messages(_) ->
gen_udp:close(NSocket1),
restart_emqx_sn(#{subs_resume => false}).
t_code_change(_) ->
Old = [state, gwid, socket, socketpid, socketstate, socketname, peername,
channel, clientid, username, password, will_msg, keepalive_interval,
connpkt, asleep_timer, enable_stats, stats_timer, enable_qos3,
has_pending_pingresp, pending_topic_ids],
New = Old ++ [false, [], undefined],
OldTulpe = erlang:list_to_tuple(Old),
NewTulpe = erlang:list_to_tuple(New),
?assertEqual({ok, name, NewTulpe},
emqx_sn_gateway:code_change(1, name, OldTulpe, ["4.3.2"])),
?assertEqual({ok, name, NewTulpe},
emqx_sn_gateway:code_change(1, name, NewTulpe, ["4.3.6"])),
?assertEqual({ok, name, NewTulpe},
emqx_sn_gateway:code_change({down, 1}, name, NewTulpe, ["4.3.6"])),
?assertEqual({ok, name, OldTulpe},
emqx_sn_gateway:code_change({down, 1}, name, NewTulpe, ["4.3.2"])).
t_topic_id_to_large(_) ->
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
mnesia:dirty_write(emqx_sn_registry, {emqx_sn_registry, {ClientId, next_topic_id}, 16#FFFF}),
TopicName1 = <<"abcD">>,
send_register_msg(Socket, TopicName1, MsgId),
?assertEqual(<<7, ?SN_REGACK, 0:16, MsgId:16, 3:8>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
CleanSession:1, ?SN_NORMAL_TOPIC:2, 0:16,
MsgId:16, 2>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_idle_timeout(_) ->
Backup = recoverable_restart_emqx_sn(fun() ->
application:set_env(emqx_sn, idle_timeout, 500),
application:set_env(emqx_sn, enable_qos3, false)
end),
timer:sleep(200),
QoS = ?QOS_NEG1,
MsgId = 1,
TopicId1 = ?PREDEF_TOPIC_ID1,
{ok, Socket} = gen_udp:open(0, [binary]),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId1, Payload1),
timer:sleep(1500),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
gen_udp:close(Socket),
_ = recoverable_restart_emqx_sn(Backup),
timer:sleep(200),
ok.
t_invalid_packet(_) ->
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test1">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_packet(Socket, emqx_sn_frame_SUITE:generate_random_binary()),
send_disconnect_msg(Socket, undefined),
?assertEqual(udp_receive_timeout, receive_response(Socket)),
gen_udp:close(Socket).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
@ -2434,6 +2660,9 @@ send_disconnect_msg(Socket, Duration) ->
?LOG("send_disconnect_msg Duration=~p", [Duration]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).
send_packet(Socket, Packet) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
mid(Id) -> Id.
tid(Id) -> Id.

View File

@ -108,6 +108,11 @@ t_deny_wildcard_topic(_Config) ->
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)),
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)).
t_gen_server(_) ->
?assertEqual(ignored, gen_server:call(emqx_sn_registry, ignored)),
?assertEqual(ok, gen_server:cast(emqx_sn_registry, ignored)),
?assertEqual(ignored, erlang:send(emqx_sn_registry, ignored)).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application, emqx_stomp,
[{description, "EMQ X Stomp Protocol Plugin"},
{vsn, "4.3.6"}, % strict semver, bump manually!
{vsn, "4.3.7"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_stomp_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.5",
[{"4.3.6",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.4",
@ -23,7 +24,8 @@
[{restart_application,emqx_stomp},
{apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
{<<".*">>,[]}],
[{"4.3.5",
[{"4.3.6",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.4",

View File

@ -469,7 +469,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
?LOG(notice, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,

20
changes/v4.3.22-en.md Normal file
View File

@ -0,0 +1,20 @@
### Enhancements
- Add a warning log if the ACL check failed for subscription [#9124](https://github.com/emqx/emqx/pull/9124).
This is to make the ACL deny logging for subscription behave the same as for publish.
### Bug fixes
- Improve the display of rule's 'Maximum Speed' counter to only reserve 2 decimal places. [#9185](https://github.com/emqx/emqx/pull/9185)
This is to avoid displaying floats like `0.30000000000000004` on the dashboard.
- Fix the issue that emqx prints too many error logs when connecting to mongodb but auth failed. [#9184](https://github.com/emqx/emqx/pull/9184)
- Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic [#9024](https://github.com/emqx/emqx/pull/9024).
- "Pause due to rate limit" log level demoted from warning to notice [#9134](https://github.com/emqx/emqx/pull/9134).
- Restore old `emqx_auth_jwt` module API, so the hook callback functions registered in older version will not be invalidated after hot-upgrade [#9144](https://github.com/emqx/emqx/pull/9144).
- Fixed the response status code for the `/status` endpoint [#9210](https://github.com/emqx/emqx/pull/9210).
Before the fix, it always returned `200` even if the EMQX application was not running. Now it returns `503` in that case.

20
changes/v4.3.22-zh.md Normal file
View File

@ -0,0 +1,20 @@
### 增强
- 订阅时,如果 ACL 检查不通过,打印一个警告日志 [#9124](https://github.com/emqx/emqx/pull/9124)。
该行为的改变主要是为了跟发布失败时的行为保持一致。
### 修复
- 改进规则的 "最大执行速度" 的计数,只保留小数点之后 2 位 [#9185](https://github.com/emqx/emqx/pull/9185)
避免在 dashboard 上展示类似这样的浮点数:`0.30000000000000004`。
- 修复在尝试连接 MongoDB 数据库过程中,如果认证失败会不停打印错误日志的问题。[#9184](https://github.com/emqx/emqx/pull/9184)
- 修复 emqx-sn 插件在“空闲”状态下收到消息发布请求时可能崩溃的情况 [#9024](https://github.com/emqx/emqx/pull/9024)。
- 限速 “Pause due to rate limit” 的日志级别从原先的 `warning` 降级到 `notice` [#9134](https://github.com/emqx/emqx/pull/9134)。
- 保留老的 `emqx_auth_jwt` 模块的接口函数,保障热升级之前添加的回调函数在热升级之后也不会失效 [#9144](https://github.com/emqx/emqx/pull/9144)。
- 修正了 `/status` API 的响应状态代码 [#9210](https://github.com/emqx/emqx/pull/9210)。
在修复之前,它总是返回 `200`,即使 EMQX 应用程序没有运行。 现在它在这种情况下返回 `503`

View File

@ -60,8 +60,8 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.14"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
]}.

View File

@ -40,10 +40,10 @@ EOF
}
logerr() {
echo -e "\e[31mERROR: $1\e[39m"
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logmsg() {
echo -e "\e[33mINFO: $1\e[39m"
echo "INFO: $1"
}
REL_BRANCH_CE="${REL_BRANCH_CE:-release-v43}"

View File

@ -45,11 +45,10 @@ EOF
}
logerr() {
echo -e "\e[31mERROR: $1\e[39m"
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logwarn() {
echo -e "\e[33mINFO: $1\e[39m"
echo "$(tput setaf 3)WARNING: $1$(tput sgr0)"
}
logmsg() {

View File

@ -1543,12 +1543,15 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
check_sub_acls(TopicFilters, Channel) ->
check_sub_acls(TopicFilters, Channel, []).
check_sub_acls([ TopicFilter = {Topic, _} | More] , Channel, Acc) ->
check_sub_acls([ TopicFilter = {Topic, SubOpts} | More] , Channel, Acc) ->
case check_sub_acl(Topic, Channel) of
allow ->
check_sub_acls(More, Channel, [ {TopicFilter, 0} | Acc]);
deny ->
check_sub_acls(More, Channel, [ {TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
ReasonCode = ?RC_NOT_AUTHORIZED,
?LOG(warning, "Cannot subscribe ~s with options ~p due to ~s.",
[Topic, SubOpts, emqx_reason_codes:text(ReasonCode)]),
check_sub_acls(More, Channel, [ {TopicFilter, ReasonCode} | Acc])
end;
check_sub_acls([], _Channel, Acc) ->
lists:reverse(Acc).

View File

@ -789,7 +789,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
?LOG(notice, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,

View File

@ -510,7 +510,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
?LOG(notice, "Pause ~pms due to rate limit", [Time]),
TRef = start_timer(Time, limit_timeout),
NState = State#state{sockstate = blocked,
limiter = Limiter1,