Merge branch 'master' into refine-changes-md

This commit is contained in:
JianBo He 2022-11-22 14:52:22 +08:00 committed by GitHub
commit 8dbf34ea94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 936 additions and 478 deletions

View File

@ -320,15 +320,9 @@ jobs:
run: sudo apt-get update && sudo apt install -y dos2unix
- name: get packages
run: |
DEFAULT_BEAM_PLATFORM='otp24.3.4.2-1'
set -e -u
cd packages/${{ matrix.profile }}
# Make a copy of the default OTP version package to a file without OTP version infix
while read -r fname; do
default_fname=$(echo "$fname" | sed "s/-${DEFAULT_BEAM_PLATFORM}//g")
echo "$fname -> $default_fname"
cp "$fname" "$default_fname"
done < <(find . -maxdepth 1 -type f | grep -E "emqx(-enterprise)?-5\.[0-9]+\.[0-9]+.*-${DEFAULT_BEAM_PLATFORM}" | grep -v elixir)
# fix the .sha256 file format
for var in $(ls | grep emqx | grep -v sha256); do
dos2unix $var.sha256
echo "$(cat $var.sha256) $var" | sha256sum -c || exit 1

View File

@ -3,6 +3,15 @@ on:
release:
types:
- published
workflow_dispatch:
inputs:
tag:
type: string
required: true
publish_release_artefacts:
type: boolean
required: true
default: false
jobs:
upload:
@ -15,22 +24,35 @@ jobs:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
- name: Get packages
- uses: actions/checkout@v3
with:
ref: ${{ github.event.inputs.tag }}
- name: Detect profile
id: profile
run: |
REF=${{ github.ref_name }}
if git describe --tags --match '[v|e]*' --exact; then
REF=$(git describe --tags --match '[v|e]*' --exact)
else
echo "Only release tags matching '[v|e]*' are supported"
exit 1
fi
case "$REF" in
v*)
s3dir='emqx-ce'
echo "profile=emqx" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
echo "s3dir=emqx-ce" >> $GITHUB_OUTPUT
;;
e*)
s3dir='emqx-ee'
;;
*)
echo "tag $REF is not supported"
exit 1
echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT
echo "s3dir=emqx-ee" >> $GITHUB_OUTPUT
;;
esac
aws s3 cp --recursive s3://${{ secrets.AWS_S3_BUCKET }}/$s3dir/${{ github.ref_name }} packages
- name: Get packages
run: |
BUCKET=${{ secrets.AWS_S3_BUCKET }}
OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ github.ref_name }} packages
cd packages
DEFAULT_BEAM_PLATFORM='otp24.3.4.2-1'
# all packages including full-name and default-name are uploaded to s3
@ -47,7 +69,7 @@ jobs:
with:
asset_paths: '["packages/*"]'
- name: update to emqx.io
if: github.event_name == 'release'
if: github.event_name == 'release' || inputs.publish_release_artefacts
run: |
set -e -x -u
curl -w %{http_code} \
@ -58,18 +80,8 @@ jobs:
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- name: update homebrew packages
if: github.event_name == 'release'
if: steps.profile.outputs.profile == 'emqx' && (github.event_name == 'release' || inputs.publish_release_artefacts)
run: |
REF=${{ github.ref_name }}
case "$REF" in
v*)
BOOL_FLAG_NAME="emqx_ce"
;;
e*)
echo "Not updating homebrew for enterprise eidition"
exit 0
;;
esac
if [ -z $(echo $version | grep -oE "(alpha|beta|rc)\.[0-9]") ]; then
curl --silent --show-error \
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
@ -78,30 +90,11 @@ jobs:
-d "{\"ref\":\"v1.0.4\",\"inputs\":{\"version\": \"${{ github.ref_name }}\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
fi
upload-helm:
runs-on: ubuntu-20.04
if: github.event_name == 'release'
strategy:
fail-fast: false
steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.ref }}
- uses: emqx/push-helm-action@v1
if: startsWith(github.ref_name, 'v')
if: github.event_name == 'release' || inputs.publish_release_artefacts
with:
charts_dir: "${{ github.workspace }}/deploy/charts/emqx"
version: ${{ github.ref_name }}
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws_region: "us-west-2"
aws_bucket_name: "repos-emqx-io"
- uses: emqx/push-helm-action@v1
if: startsWith(github.ref_name, 'e')
with:
charts_dir: "${{ github.workspace }}/deploy/charts/emqx-enterprise"
version: ${{ github.ref_name }}
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }}
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws_region: "us-west-2"

View File

@ -23,7 +23,7 @@ PKG_PROFILES := emqx-pkg emqx-enterprise-pkg
PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
CT_NODE_NAME ?= 'test@127.0.0.1'
CT_READABLE ?= false
CT_READABLE ?= true
export REBAR_GIT_CLONE_OPTIONS += --depth=1

View File

@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.10"},
{vsn, "5.0.11"},
{modules, []},
{registered, []},
{applications, [

View File

@ -56,7 +56,7 @@ authorize(ClientInfo, PubSub, <<"$delayed/", Data/binary>> = RawTopic) ->
authorize(ClientInfo, PubSub, Topic);
_ ->
?SLOG(warning, #{
msg => "invalid_dealyed_topic_format",
msg => "invalid_delayed_topic_format",
expected_example => "$delayed/1/t/foo",
got => RawTopic
}),

View File

@ -21,6 +21,7 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -180,7 +181,7 @@ create(#{
create(Banned = #banned{who = Who}) ->
case look_up(Who) of
[] ->
mria:dirty_write(?BANNED_TAB, Banned),
insert_banned(Banned),
{ok, Banned};
[OldBanned = #banned{until = Until}] ->
%% Don't support shorten or extend the until time by overwrite.
@ -190,7 +191,7 @@ create(Banned = #banned{who = Who}) ->
{error, {already_exist, OldBanned}};
%% overwrite expired one is ok.
false ->
mria:dirty_write(?BANNED_TAB, Banned),
insert_banned(Banned),
{ok, Banned}
end
end.
@ -266,3 +267,21 @@ expire_banned_items(Now) ->
ok,
?BANNED_TAB
).
insert_banned(Banned) ->
mria:dirty_write(?BANNED_TAB, Banned),
on_banned(Banned).
on_banned(#banned{who = {clientid, ClientId}}) ->
%% kick the session if the client is banned by clientid
?tp(
warning,
kick_session_due_to_banned,
#{
clientid => ClientId
}
),
emqx_cm:kick_session(ClientId),
ok;
on_banned(_) ->
ok.

View File

@ -2134,10 +2134,14 @@ will_delay_interval(WillMsg) ->
0
).
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
publish_will_msg(
ClientInfo = #{mountpoint := MountPoint},
Msg = #message{topic = Topic}
) ->
case emqx_access_control:authorize(ClientInfo, publish, Topic) of
allow ->
_ = emqx_broker:publish(Msg),
NMsg = emqx_mountpoint:mount(MountPoint, Msg),
_ = emqx_broker:publish(NMsg),
ok;
deny ->
?tp(

View File

@ -38,7 +38,8 @@
delete/1,
clear/0,
update/2,
check/0
check/0,
now_second/0
]).
-export([
@ -287,7 +288,7 @@ insert_new_trace(Trace) ->
transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]).
update_trace(Traces) ->
Now = erlang:system_time(second),
Now = now_second(),
{_Waiting, Running, Finished} = classify_by_time(Traces, Now),
disable_finished(Finished),
Started = emqx_trace_handler:running(),
@ -455,7 +456,7 @@ ensure_map(Trace) when is_list(Trace) ->
).
fill_default(Trace = #?TRACE{start_at = undefined}) ->
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
fill_default(Trace#?TRACE{start_at = now_second()});
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
fill_default(Trace) ->
@ -493,7 +494,7 @@ to_trace(#{start_at := StartAt} = Trace, Rec) ->
{ok, Sec} = to_system_second(StartAt),
to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
to_trace(#{end_at := EndAt} = Trace, Rec) ->
Now = erlang:system_time(second),
Now = now_second(),
case to_system_second(EndAt) of
{ok, Sec} when Sec > Now ->
to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec});
@ -517,8 +518,7 @@ validate_ip_address(IP) ->
end.
to_system_second(Sec) ->
Now = erlang:system_time(second),
{ok, erlang:max(Now, Sec)}.
{ok, erlang:max(now_second(), Sec)}.
zip_dir() ->
filename:join([trace_dir(), "zip"]).
@ -570,3 +570,6 @@ filter_cli_handler(Names) ->
end,
Names
).
now_second() ->
os:system_time(second).

View File

@ -30,7 +30,7 @@
-include("emqx_trace.hrl").
%%================================================================================
%% API funcions
%% API functions
%%================================================================================
%% Introduced in 5.0
@ -43,7 +43,7 @@ update(Name, Enable) ->
[#?TRACE{enable = Enable}] ->
ok;
[Rec] ->
case erlang:system_time(second) >= Rec#?TRACE.end_at of
case emqx_trace:now_second() >= Rec#?TRACE.end_at of
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
true -> mnesia:abort(finished)
end

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
@ -32,12 +33,12 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_testcase(t_delayed_authorize, Config) ->
meck:unload(emqx_access_control),
Config;
end_per_testcase(_, Config) ->
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok = emqx_hooks:del('client.authorize', {?MODULE, authz_stub}).
t_authenticate(_) ->
?assertMatch({ok, _}, emqx_access_control:authenticate(clientinfo())).
@ -46,31 +47,26 @@ t_authorize(_) ->
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), Publish, <<"t">>)).
t_delayed_authorize(_) ->
RawTopic = "$dealyed/1/foo/2",
InvalidTopic = "$dealyed/1/foo/3",
Topic = "foo/2",
RawTopic = <<"$delayed/1/foo/2">>,
InvalidTopic = <<"$delayed/1/foo/3">>,
Topic = <<"foo/2">>,
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(
emqx_access_control,
do_authorize,
fun
(_, _, Topic) -> allow;
(_, _, _) -> deny
end
),
ok = emqx_hooks:put('client.authorize', {?MODULE, authz_stub, [Topic]}, ?HP_AUTHZ),
Publish1 = ?PUBLISH_PACKET(?QOS_0, RawTopic, 1, <<"payload">>),
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), Publish1, RawTopic)),
Publish2 = ?PUBLISH_PACKET(?QOS_0, InvalidTopic, 1, <<"payload">>),
?assertEqual(allow, emqx_access_control:authorize(clientinfo(), Publish2, InvalidTopic)),
?assertEqual(deny, emqx_access_control:authorize(clientinfo(), Publish2, InvalidTopic)),
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
authz_stub(_Client, _PubSub, ValidTopic, _DefaultResult, ValidTopic) -> {stop, #{result => allow}};
authz_stub(_Client, _PubSub, _Topic, _DefaultResult, _ValidTopic) -> {stop, #{result => deny}}.
clientinfo() -> clientinfo(#{}).
clientinfo(InitProps) ->
maps:merge(

View File

@ -21,18 +21,20 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx),
emqx_common_test_helpers:start_apps([]),
ok = ekka:start(),
Config.
end_per_suite(_Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema().
mria_mnesia:delete_schema(),
emqx_common_test_helpers:stop_apps([]).
t_add_delete(_) ->
Banned = #banned{
@ -95,19 +97,47 @@ t_check(_) ->
?assertEqual(0, emqx_banned:info(size)).
t_unused(_) ->
catch emqx_banned:stop(),
{ok, Banned} = emqx_banned:start_link(),
{ok, _} = emqx_banned:create(#banned{
who = {clientid, <<"BannedClient1">>},
until = erlang:system_time(second)
}),
{ok, _} = emqx_banned:create(#banned{
who = {clientid, <<"BannedClient2">>},
until = erlang:system_time(second) - 1
}),
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
?assertEqual(ok, Banned ! ok),
Who1 = {clientid, <<"BannedClient1">>},
Who2 = {clientid, <<"BannedClient2">>},
?assertMatch(
{ok, _},
emqx_banned:create(#banned{
who = Who1,
until = erlang:system_time(second)
})
),
?assertMatch(
{ok, _},
emqx_banned:create(#banned{
who = Who2,
until = erlang:system_time(second) - 1
})
),
?assertEqual(ignored, gen_server:call(emqx_banned, unexpected_req)),
?assertEqual(ok, gen_server:cast(emqx_banned, unexpected_msg)),
%% expiry timer
timer:sleep(500),
ok = emqx_banned:stop().
ok = emqx_banned:delete(Who1),
ok = emqx_banned:delete(Who2).
t_kick(_) ->
ClientId = <<"client">>,
snabbkaffe:start_trace(),
Now = erlang:system_time(second),
Who = {clientid, ClientId},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120
}),
Trace = snabbkaffe:collect_trace(),
snabbkaffe:stop(),
emqx_banned:delete(Who),
?assertEqual(1, length(?of_kind(kick_session_due_to_banned, Trace))).

View File

@ -728,6 +728,22 @@ t_quota_qos2(_) ->
del_bucket(),
esockd_limiter:stop().
t_mount_will_msg(_) ->
Self = self(),
ClientInfo = clientinfo(#{mountpoint => <<"prefix/">>}),
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
Channel = channel(#{clientinfo => ClientInfo, will_msg => Msg}),
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
{shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call(
kick, Channel
),
receive
{pub, #message{topic = <<"prefix/will_topic">>}} -> ok
after 200 -> exit(will_message_not_published_or_not_correct)
end.
%%--------------------------------------------------------------------
%% Test cases for handle_deliver
%%--------------------------------------------------------------------

View File

@ -40,7 +40,7 @@ init_per_suite(Config) ->
?wait_async_action(
emqx_common_test_helpers:start_apps([]),
#{?snk_kind := listener_started, bind := 1883},
timer:seconds(10)
timer:seconds(100)
),
fun(Trace) ->
%% more than one listener

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.8"},
{vsn, "0.1.9"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},

View File

@ -30,6 +30,7 @@
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
% Swagger
@ -224,7 +225,8 @@ schema("/authentication/:id/status") ->
hoconsc:ref(emqx_authn_schema, "metrics_status_fields"),
status_metrics_example()
),
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
404 => error_codes([?NOT_FOUND], <<"Not Found">>),
500 => error_codes([?INTERNAL_ERROR], <<"Internal Service Error">>)
}
}
};
@ -576,7 +578,11 @@ authenticator(delete, #{bindings := #{id := AuthenticatorID}}) ->
delete_authenticator([authentication], ?GLOBAL, AuthenticatorID).
authenticator_status(get, #{bindings := #{id := AuthenticatorID}}) ->
lookup_from_all_nodes(?GLOBAL, AuthenticatorID).
with_authenticator(
AuthenticatorID,
[authentication],
fun(_) -> lookup_from_all_nodes(?GLOBAL, AuthenticatorID) end
).
listener_authenticators(post, #{bindings := #{listener_id := ListenerID}, body := Config}) ->
with_listener(
@ -647,8 +653,12 @@ listener_authenticator_status(
) ->
with_listener(
ListenerID,
fun(_, _, ChainName) ->
lookup_from_all_nodes(ChainName, AuthenticatorID)
fun(Type, Name, ChainName) ->
with_authenticator(
AuthenticatorID,
[listeners, Type, Name, authentication],
fun(_) -> lookup_from_all_nodes(ChainName, AuthenticatorID) end
)
end
).
@ -774,6 +784,18 @@ listener_authenticator_user(delete, #{
%% Internal functions
%%------------------------------------------------------------------------------
with_authenticator(AuthenticatorID, ConfKeyPath, Fun) ->
case find_authenticator_config(AuthenticatorID, ConfKeyPath) of
{ok, AuthenticatorConfig} ->
Fun(AuthenticatorConfig);
{error, Reason} ->
serialize_error(Reason)
end.
find_authenticator_config(AuthenticatorID, ConfKeyPath) ->
AuthenticatorsConfig = get_raw_config_with_defaults(ConfKeyPath),
find_config(AuthenticatorID, AuthenticatorsConfig).
with_listener(ListenerID, Fun) ->
case find_listener(ListenerID) of
{ok, {BType, BName}} ->
@ -836,13 +858,13 @@ list_authenticators(ConfKeyPath) ->
{200, NAuthenticators}.
list_authenticator(_, ConfKeyPath, AuthenticatorID) ->
AuthenticatorsConfig = get_raw_config_with_defaults(ConfKeyPath),
case find_config(AuthenticatorID, AuthenticatorsConfig) of
{ok, AuthenticatorConfig} ->
{200, maps:put(id, AuthenticatorID, convert_certs(AuthenticatorConfig))};
{error, Reason} ->
serialize_error(Reason)
end.
with_authenticator(
AuthenticatorID,
ConfKeyPath,
fun(AuthenticatorConfig) ->
{200, maps:put(id, AuthenticatorID, convert_certs(AuthenticatorConfig))}
end
).
resource_provider() ->
[
@ -877,7 +899,8 @@ lookup_from_local_node(ChainName, AuthenticatorID) ->
lookup_from_all_nodes(ChainName, AuthenticatorID) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID)) of
LookupResult = emqx_authn_proto_v1:lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID),
case is_ok(LookupResult) of
{ok, ResList} ->
{StatusMap, MetricsMap, ResourceMetricsMap, ErrorMap} = make_result_map(ResList),
AggregateStatus = aggregate_status(maps:values(StatusMap)),
@ -901,7 +924,7 @@ lookup_from_all_nodes(ChainName, AuthenticatorID) ->
node_error => HelpFun(maps:map(Fun, ErrorMap), reason)
}};
{error, ErrL} ->
{400, #{
{500, #{
code => <<"INTERNAL_ERROR">>,
message => list_to_binary(io_lib:format("~p", [ErrL]))
}}

View File

@ -365,11 +365,11 @@ verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
acl(Claims, AclClaimName) ->
Acl =
case Claims of
#{<<"exp">> := Expire, AclClaimName := Rules} ->
#{AclClaimName := Rules} ->
#{
acl => #{
rules => Rules,
expire => Expire
expire => maps:get(<<"exp">>, Claims, undefined)
}
};
_ ->

View File

@ -39,6 +39,9 @@ all() ->
groups() ->
[].
init_per_testcase(t_authenticator_fail, Config) ->
meck:expect(emqx_authn_proto_v1, lookup_from_all_nodes, 3, [{error, {exception, badarg}}]),
init_per_testcase(default, Config);
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
emqx_authn_test_lib:delete_authenticators(
@ -54,6 +57,12 @@ init_per_testcase(_, Config) ->
{atomic, ok} = mria:clear_table(emqx_authn_mnesia),
Config.
end_per_testcase(t_authenticator_fail, Config) ->
meck:unload(emqx_authn_proto_v1),
Config;
end_per_testcase(_, Config) ->
Config.
init_per_suite(Config) ->
emqx_config:erase(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY),
_ = application:load(emqx_conf),
@ -90,6 +99,21 @@ t_authenticators(_) ->
t_authenticator(_) ->
test_authenticator([]).
t_authenticator_fail(_) ->
ValidConfig0 = emqx_authn_test_lib:http_example(),
{ok, 200, _} = request(
post,
uri([?CONF_NS]),
ValidConfig0
),
?assertMatch(
{ok, 500, _},
request(
get,
uri([?CONF_NS, "password_based:http", "status"])
)
).
t_authenticator_users(_) ->
test_authenticator_users([]).
@ -247,6 +271,15 @@ test_authenticator(PathPrefix) ->
<<"connected">>,
LookFun([<<"status">>])
),
?assertMatch(
{ok, 404, _},
request(
get,
uri(PathPrefix ++ [?CONF_NS, "unknown_auth_chain", "status"])
)
),
{ok, 404, _} = request(
get,
uri(PathPrefix ++ [?CONF_NS, "password_based:redis"])

View File

@ -305,6 +305,50 @@ t_check_expire(_Config) ->
ok = emqtt:disconnect(C).
t_check_no_expire(_Config) ->
Payload = #{
<<"username">> => <<"username">>,
<<"acl">> => #{<<"sub">> => [<<"a/b">>]}
},
JWT = generate_jws(Payload),
{ok, C} = emqtt:start_link(
[
{clean_start, true},
{proto_ver, v5},
{clientid, <<"clientid">>},
{username, <<"username">>},
{password, JWT}
]
),
{ok, _} = emqtt:connect(C),
?assertMatch(
{ok, #{}, [0]},
emqtt:subscribe(C, <<"a/b">>, 0)
),
?assertMatch(
{ok, #{}, [0]},
emqtt:unsubscribe(C, <<"a/b">>)
),
ok = emqtt:disconnect(C).
t_check_undefined_expire(_Config) ->
Acl = #{expire => undefined, rules => #{<<"sub">> => [<<"a/b">>]}},
Client = #{acl => Acl},
?assertMatch(
{matched, allow},
emqx_authz_client_info:authorize(Client, subscribe, <<"a/b">>, undefined)
),
?assertMatch(
{matched, deny},
emqx_authz_client_info:authorize(Client, subscribe, <<"a/bar">>, undefined)
).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------

View File

@ -57,6 +57,14 @@ It's enum with `stomp`, `mqttsn`, `coap`, `lwm2m`, `exproto`
}
}
gateway_enable_in_path {
desc {
en: """Whether or not gateway is enabled"""
zh: """是否开启此网关"""
}
}
gateway_status {
desc {
en: """Gateway status"""

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, emqx, emqx_authn]},

View File

@ -19,8 +19,6 @@
-include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx/include/emqx_authentication.hrl").
-behaviour(minirest_api).
@ -34,7 +32,7 @@
]
).
%% minirest/dashbaord_swagger behaviour callbacks
%% minirest/dashboard_swagger behaviour callbacks
-export([
api_spec/0,
paths/0,
@ -49,8 +47,9 @@
%% http handlers
-export([
gateways/2,
gateway/2,
gateway_insta/2
gateway_enable/2
]).
-define(KNOWN_GATEWAY_STATUSES, [<<"running">>, <<"stopped">>, <<"unloaded">>]).
@ -66,13 +65,14 @@ api_spec() ->
paths() ->
emqx_gateway_utils:make_deprecated_paths([
"/gateways",
"/gateways/:name"
"/gateways/:name",
"/gateways/:name/enable/:enable"
]).
%%--------------------------------------------------------------------
%% http handlers
gateway(get, Request) ->
gateways(get, Request) ->
Params = maps:get(query_string, Request, #{}),
Status = maps:get(<<"status">>, Params, <<"all">>),
case lists:member(Status, [<<"all">> | ?KNOWN_GATEWAY_STATUSES]) of
@ -89,84 +89,85 @@ gateway(get, Request) ->
lists:join(", ", ?KNOWN_GATEWAY_STATUSES)
]
)
end;
gateway(post, Request) ->
Body = maps:get(body, Request, #{}),
try
Name0 = maps:get(<<"name">>, Body),
GwName = binary_to_existing_atom(Name0),
case emqx_gateway_registry:lookup(GwName) of
undefined ->
error(badarg);
_ ->
GwConf = maps:without([<<"name">>], Body),
case emqx_gateway_conf:load_gateway(GwName, GwConf) of
{ok, NGwConf} ->
{201, NGwConf};
{error, Reason} ->
emqx_gateway_http:reason2resp(Reason)
end
end
catch
error:{badkey, K} ->
return_http_error(400, [K, " is required"]);
error:{badconf, _} = Reason1 ->
emqx_gateway_http:reason2resp(Reason1);
error:badarg ->
return_http_error(404, "Bad gateway name")
end.
gateway_insta(delete, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_conf:unload_gateway(GwName) of
ok ->
gateway(get, #{bindings := #{name := Name}}) ->
try
GwName = gw_name(Name),
case emqx_gateway:lookup(GwName) of
undefined ->
{200, #{name => GwName, status => unloaded}};
Gateway ->
GwConf = emqx_gateway_conf:gateway(Name),
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at],
Gateway
),
GwInfo1 = maps:with(
[
name,
status,
created_at,
started_at,
stopped_at
],
GwInfo0
),
{200, maps:merge(GwConf, GwInfo1)}
end
catch
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end;
gateway(put, #{
body := GwConf0,
bindings := #{name := Name}
}) ->
GwConf = maps:without([<<"name">>], GwConf0),
try
GwName = gw_name(Name),
LoadOrUpdateF =
case emqx_gateway:lookup(GwName) of
undefined ->
fun emqx_gateway_conf:load_gateway/2;
_ ->
fun emqx_gateway_conf:update_gateway/2
end,
case LoadOrUpdateF(GwName, GwConf) of
{ok, _} ->
{204};
{error, Reason} ->
emqx_gateway_http:reason2resp(Reason)
end
end);
gateway_insta(get, #{bindings := #{name := Name0}}) ->
try binary_to_existing_atom(Name0) of
GwName ->
case emqx_gateway:lookup(GwName) of
undefined ->
{200, #{name => GwName, status => unloaded}};
Gateway ->
GwConf = emqx_gateway_conf:gateway(Name0),
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at],
Gateway
),
GwInfo1 = maps:with(
[
name,
status,
created_at,
started_at,
stopped_at
],
GwInfo0
),
{200, maps:merge(GwConf, GwInfo1)}
end
catch
error:badarg ->
return_http_error(404, "Bad gateway name")
end;
gateway_insta(put, #{
body := GwConf0,
bindings := #{name := Name0}
}) ->
with_gateway(Name0, fun(GwName, _) ->
%% XXX: Clear the unused fields
GwConf = maps:without([<<"name">>], GwConf0),
case emqx_gateway_conf:update_gateway(GwName, GwConf) of
{ok, Gateway} ->
{200, Gateway};
{error, Reason} ->
emqx_gateway_http:reason2resp(Reason)
error:{badconf, _} = Reason1 ->
emqx_gateway_http:reason2resp(Reason1);
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end.
gateway_enable(put, #{bindings := #{name := Name, enable := Enable}}) ->
try
GwName = gw_name(Name),
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, <<"NOT FOUND">>);
_Gateway ->
{ok, _} = emqx_gateway_conf:update_gateway(GwName, #{<<"enable">> => Enable}),
{204}
end
end).
catch
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end.
-spec gw_name(binary()) -> stomp | coap | lwm2m | mqttsn | exproto | no_return().
gw_name(<<"stomp">>) -> stomp;
gw_name(<<"coap">>) -> coap;
gw_name(<<"lwm2m">>) -> lwm2m;
gw_name(<<"mqttsn">>) -> mqttsn;
gw_name(<<"exproto">>) -> exproto;
gw_name(_Else) -> throw(not_found).
%%--------------------------------------------------------------------
%% Swagger defines
@ -174,7 +175,7 @@ gateway_insta(put, #{
schema("/gateways") ->
#{
'operationId' => gateway,
'operationId' => gateways,
get =>
#{
tags => ?TAGS,
@ -182,29 +183,20 @@ schema("/gateways") ->
summary => <<"List All Gateways">>,
parameters => params_gateway_status_in_qs(),
responses =>
?STANDARD_RESP(
#{
200 => emqx_dashboard_swagger:schema_with_example(
hoconsc:array(ref(gateway_overview)),
examples_gateway_overview()
)
}
)
},
post =>
#{
tags => ?TAGS,
desc => ?DESC(enable_gateway),
summary => <<"Enable a Gateway">>,
%% TODO: distinguish create & response swagger schema
'requestBody' => schema_gateways_conf(),
responses =>
?STANDARD_RESP(#{201 => schema_gateways_conf()})
#{
200 => emqx_dashboard_swagger:schema_with_example(
hoconsc:array(ref(gateway_overview)),
examples_gateway_overview()
),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>
)
}
}
};
schema("/gateways/:name") ->
#{
'operationId' => gateway_insta,
'operationId' => gateway,
get =>
#{
tags => ?TAGS,
@ -212,26 +204,41 @@ schema("/gateways/:name") ->
summary => <<"Get the Gateway">>,
parameters => params_gateway_name_in_path(),
responses =>
?STANDARD_RESP(#{200 => schema_gateways_conf()})
},
delete =>
#{
tags => ?TAGS,
desc => ?DESC(delete_gateway),
summary => <<"Unload the gateway">>,
parameters => params_gateway_name_in_path(),
responses =>
?STANDARD_RESP(#{204 => <<"Deleted">>})
#{
200 => schema_gateways_conf(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND, ?RESOURCE_NOT_FOUND], <<"Not Found">>
)
}
},
put =>
#{
tags => ?TAGS,
desc => ?DESC(update_gateway),
summary => <<"Update the gateway confs">>,
% [FIXME] add proper desc
summary => <<"Load or update the gateway confs">>,
parameters => params_gateway_name_in_path(),
'requestBody' => schema_update_gateways_conf(),
'requestBody' => schema_load_or_update_gateways_conf(),
responses =>
?STANDARD_RESP(#{200 => schema_gateways_conf()})
?STANDARD_RESP(#{204 => <<"Gateway configuration updated">>})
}
};
schema("/gateways/:name/enable/:enable") ->
#{
'operationId' => gateway_enable,
put =>
#{
tags => ?TAGS,
desc => ?DESC(update_gateway),
summary => <<"Enable or disable gateway">>,
parameters => params_gateway_name_in_path() ++ params_gateway_enable_in_path(),
responses =>
#{
204 => <<"Gateway configuration updated">>,
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND, ?RESOURCE_NOT_FOUND], <<"Not Found">>
)
}
}
};
schema(Path) ->
@ -268,6 +275,18 @@ params_gateway_status_in_qs() ->
)}
].
params_gateway_enable_in_path() ->
[
{enable,
mk(
boolean(),
#{
in => path,
desc => ?DESC(gateway_enable_in_path),
example => true
}
)}
].
%%--------------------------------------------------------------------
%% schemas
@ -377,8 +396,6 @@ fields(Gw) when
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:fields(Gw));
fields(update_disable_enable_only) ->
[{enable, mk(boolean(), #{desc => <<"Enable/Disable the gateway">>})}];
fields(Gw) when
Gw == update_stomp;
Gw == update_mqttsn;
@ -431,15 +448,19 @@ fields(Listener) when
fields(gateway_stats) ->
[{key, mk(binary(), #{})}].
schema_update_gateways_conf() ->
schema_load_or_update_gateways_conf() ->
emqx_dashboard_swagger:schema_with_examples(
hoconsc:union([
ref(?MODULE, stomp),
ref(?MODULE, mqttsn),
ref(?MODULE, coap),
ref(?MODULE, lwm2m),
ref(?MODULE, exproto),
ref(?MODULE, update_stomp),
ref(?MODULE, update_mqttsn),
ref(?MODULE, update_coap),
ref(?MODULE, update_lwm2m),
ref(?MODULE, update_exproto),
ref(?MODULE, update_disable_enable_only)
ref(?MODULE, update_exproto)
]),
examples_update_gateway_confs()
).

View File

@ -30,8 +30,7 @@
[
return_http_error/2,
with_gateway/2,
with_authn/2,
checks/2
with_authn/2
]
).

View File

@ -23,7 +23,7 @@
emqx_gateway_test_utils,
[
assert_confs/2,
assert_feilds_apperence/2,
assert_fields_exist/2,
request/2,
request/3,
ssl_server_opts/0,
@ -32,6 +32,7 @@
).
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% this parses to #{}, will not cause config cleanup
%% so we will need call emqx_config:erase
@ -55,32 +56,68 @@ end_per_suite(Conf) ->
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]),
Conf.
init_per_testcase(t_gateway_fail, Config) ->
meck:expect(
emqx_gateway_conf,
update_gateway,
fun
(stomp, V) -> {error, {badconf, #{key => gw, value => V, reason => test_error}}};
(coap, V) -> error({badconf, #{key => gw, value => V, reason => test_crash}})
end
),
Config;
init_per_testcase(_, Config) ->
Config.
end_per_testcase(TestCase, Config) ->
case TestCase of
t_gateway_fail -> meck:unload(emqx_gateway_conf);
_ -> ok
end,
[emqx_gateway_conf:unload_gateway(GwName) || GwName <- [stomp, mqttsn, coap, lwm2m, exproto]],
Config.
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_gateway(_) ->
t_gateways(_) ->
{200, Gateways} = request(get, "/gateways"),
lists:foreach(fun assert_gw_unloaded/1, Gateways),
{200, UnloadedGateways} = request(get, "/gateways?status=unloaded"),
lists:foreach(fun assert_gw_unloaded/1, UnloadedGateways),
{200, NoRunningGateways} = request(get, "/gateways?status=running"),
?assertEqual([], NoRunningGateways),
{404, GwNotFoundReq} = request(get, "/gateways/unknown_gateway"),
assert_not_found(GwNotFoundReq),
{400, BadReqInvalidStatus} = request(get, "/gateways?status=invalid_status"),
assert_bad_request(BadReqInvalidStatus),
{400, BadReqUCStatus} = request(get, "/gateways?status=UNLOADED"),
assert_bad_request(BadReqUCStatus),
{201, _} = request(post, "/gateways", #{name => <<"stomp">>}),
{200, StompGw1} = request(get, "/gateways/stomp"),
assert_feilds_apperence(
ok.
t_gateway(_) ->
{404, GwNotFoundReq1} = request(get, "/gateways/not_a_known_atom"),
assert_not_found(GwNotFoundReq1),
{404, GwNotFoundReq2} = request(get, "/gateways/undefined"),
assert_not_found(GwNotFoundReq2),
{204, _} = request(put, "/gateways/stomp", #{}),
{200, StompGw} = request(get, "/gateways/stomp"),
assert_fields_exist(
[name, status, enable, created_at, started_at],
StompGw1
StompGw
),
{204, _} = request(delete, "/gateways/stomp"),
{200, StompGw2} = request(get, "/gateways/stomp"),
assert_gw_unloaded(StompGw2),
{204, _} = request(put, "/gateways/stomp", #{enable => true}),
{200, #{enable := true}} = request(get, "/gateway/stomp"),
{204, _} = request(put, "/gateways/stomp", #{enable => false}),
{200, #{enable := false}} = request(get, "/gateway/stomp"),
{404, _} = request(put, "/gateways/undefined", #{}),
{400, _} = request(put, "/gateways/stomp", #{bad_key => "foo"}),
ok.
t_gateway_fail(_) ->
{204, _} = request(put, "/gateways/stomp", #{}),
{400, _} = request(put, "/gateways/stomp", #{}),
{204, _} = request(put, "/gateways/coap", #{}),
{400, _} = request(put, "/gateways/coap", #{}),
ok.
t_deprecated_gateway(_) ->
@ -88,21 +125,30 @@ t_deprecated_gateway(_) ->
lists:foreach(fun assert_gw_unloaded/1, Gateways),
{404, NotFoundReq} = request(get, "/gateway/uname_gateway"),
assert_not_found(NotFoundReq),
{201, _} = request(post, "/gateway", #{name => <<"stomp">>}),
{200, StompGw1} = request(get, "/gateway/stomp"),
assert_feilds_apperence(
{204, _} = request(put, "/gateway/stomp", #{}),
{200, StompGw} = request(get, "/gateway/stomp"),
assert_fields_exist(
[name, status, enable, created_at, started_at],
StompGw1
StompGw
),
{204, _} = request(delete, "/gateway/stomp"),
{200, StompGw2} = request(get, "/gateway/stomp"),
assert_gw_unloaded(StompGw2),
ok.
t_gateway_enable(_) ->
{204, _} = request(put, "/gateways/stomp", #{}),
{200, #{enable := Enable}} = request(get, "/gateway/stomp"),
NotEnable = not Enable,
{204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(NotEnable), undefined),
{200, #{enable := NotEnable}} = request(get, "/gateway/stomp"),
{204, _} = request(put, "/gateways/stomp/enable/" ++ atom_to_list(Enable), undefined),
{200, #{enable := Enable}} = request(get, "/gateway/stomp"),
{404, _} = request(put, "/gateways/undefined/enable/true", undefined),
{404, _} = request(put, "/gateways/not_a_known_atom/enable/true", undefined),
{404, _} = request(put, "/gateways/coap/enable/true", undefined),
ok.
t_gateway_stomp(_) ->
{200, Gw} = request(get, "/gateways/stomp"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{
name => <<"stomp">>,
frame => #{
@ -114,20 +160,18 @@ t_gateway_stomp(_) ->
#{name => <<"def">>, type => <<"tcp">>, bind => <<"61613">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/stomp", GwConf),
{200, ConfResp} = request(get, "/gateways/stomp"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{frame => #{max_headers => 10}}),
{200, _} = request(put, "/gateways/stomp", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/stomp", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/stomp"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_gateway_mqttsn(_) ->
{200, Gw} = request(get, "/gateways/mqttsn"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{
name => <<"mqttsn">>,
gateway_id => 1,
@ -138,20 +182,18 @@ t_gateway_mqttsn(_) ->
#{name => <<"def">>, type => <<"udp">>, bind => <<"1884">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/mqttsn", GwConf),
{200, ConfResp} = request(get, "/gateways/mqttsn"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{predefined => []}),
{200, _} = request(put, "/gateways/mqttsn", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/mqttsn", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/mqttsn"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/mqttsn").
ok.
t_gateway_coap(_) ->
{200, Gw} = request(get, "/gateways/coap"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{
name => <<"coap">>,
heartbeat => <<"60s">>,
@ -160,20 +202,18 @@ t_gateway_coap(_) ->
#{name => <<"def">>, type => <<"udp">>, bind => <<"5683">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/coap", GwConf),
{200, ConfResp} = request(get, "/gateways/coap"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{heartbeat => <<"10s">>}),
{200, _} = request(put, "/gateways/coap", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/coap", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/coap"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/coap").
ok.
t_gateway_lwm2m(_) ->
{200, Gw} = request(get, "/gateways/lwm2m"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{
name => <<"lwm2m">>,
xml_dir => <<"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml">>,
@ -192,20 +232,18 @@ t_gateway_lwm2m(_) ->
#{name => <<"def">>, type => <<"udp">>, bind => <<"5783">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/lwm2m", GwConf),
{200, ConfResp} = request(get, "/gateways/lwm2m"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{qmode_time_window => <<"10s">>}),
{200, _} = request(put, "/gateways/lwm2m", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/lwm2m", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/lwm2m"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/lwm2m").
ok.
t_gateway_exproto(_) ->
{200, Gw} = request(get, "/gateways/exproto"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{
name => <<"exproto">>,
server => #{bind => <<"9100">>},
@ -214,15 +252,14 @@ t_gateway_exproto(_) ->
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/exproto", GwConf),
{200, ConfResp} = request(get, "/gateways/exproto"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{server => #{bind => <<"9200">>}}),
{200, _} = request(put, "/gateways/exproto", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/exproto", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/exproto"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/exproto").
ok.
t_gateway_exproto_with_ssl(_) ->
{200, Gw} = request(get, "/gateways/exproto"),
@ -230,7 +267,6 @@ t_gateway_exproto_with_ssl(_) ->
SslSvrOpts = ssl_server_opts(),
SslCliOpts = ssl_client_opts(),
%% post
GwConf = #{
name => <<"exproto">>,
server => #{
@ -245,27 +281,22 @@ t_gateway_exproto_with_ssl(_) ->
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/exproto", GwConf),
{200, ConfResp} = request(get, "/gateways/exproto"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{
server => #{
bind => <<"9200">>,
ssl_options => SslCliOpts
}
}),
{200, _} = request(put, "/gateways/exproto", maps:without([name, listeners], GwConf2)),
{204, _} = request(put, "/gateways/exproto", maps:without([name, listeners], GwConf2)),
{200, ConfResp2} = request(get, "/gateways/exproto"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateways/exproto").
ok.
t_authn(_) ->
GwConf = #{name => <<"stomp">>},
{201, _} = request(post, "/gateways", GwConf),
ct:sleep(500),
{204, _} = request(get, "/gateways/stomp/authentication"),
init_gw("stomp"),
AuthConf = #{
mechanism => <<"password_based">>,
backend => <<"built_in_database">>,
@ -283,22 +314,18 @@ t_authn(_) ->
{204, _} = request(delete, "/gateways/stomp/authentication"),
{204, _} = request(get, "/gateways/stomp/authentication"),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_authn_data_mgmt(_) ->
GwConf = #{name => <<"stomp">>},
{201, _} = request(post, "/gateways", GwConf),
ct:sleep(500),
{204, _} = request(get, "/gateways/stomp/authentication"),
init_gw("stomp"),
AuthConf = #{
mechanism => <<"password_based">>,
backend => <<"built_in_database">>,
user_id_type => <<"clientid">>
},
{201, _} = request(post, "/gateways/stomp/authentication", AuthConf),
ct:sleep(500),
{200, ConfResp} = request(get, "/gateways/stomp/authentication"),
{200, ConfResp} =
?retry(10, 10, {200, _} = request(get, "/gateways/stomp/authentication")),
assert_confs(AuthConf, ConfResp),
User1 = #{
@ -358,11 +385,10 @@ t_authn_data_mgmt(_) ->
{204, _} = request(delete, "/gateways/stomp/authentication"),
{204, _} = request(get, "/gateways/stomp/authentication"),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_listeners_tcp(_) ->
GwConf = #{name => <<"stomp">>},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/stomp", #{}),
{404, _} = request(get, "/gateways/stomp/listeners"),
LisConf = #{
name => <<"def">>,
@ -387,7 +413,7 @@ t_listeners_tcp(_) ->
{204, _} = request(delete, "/gateways/stomp/listeners/stomp:tcp:def"),
{404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_listeners_authn(_) ->
GwConf = #{
@ -400,9 +426,7 @@ t_listeners_authn(_) ->
}
]
},
{201, _} = request(post, "/gateways", GwConf),
ct:sleep(500),
{200, ConfResp} = request(get, "/gateways/stomp"),
ConfResp = init_gw("stomp", GwConf),
assert_confs(GwConf, ConfResp),
AuthConf = #{
@ -424,7 +448,7 @@ t_listeners_authn(_) ->
{204, _} = request(delete, Path),
%% FIXME: 204?
{204, _} = request(get, Path),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_listeners_authn_data_mgmt(_) ->
GwConf = #{
@ -437,7 +461,7 @@ t_listeners_authn_data_mgmt(_) ->
}
]
},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(put, "/gateways/stomp", GwConf),
{200, ConfResp} = request(get, "/gateways/stomp"),
assert_confs(GwConf, ConfResp),
@ -514,13 +538,10 @@ t_listeners_authn_data_mgmt(_) ->
{filename, "user-credentials.csv", CSVData}
]),
{204, _} = request(delete, "/gateways/stomp").
ok.
t_authn_fuzzy_search(_) ->
GwConf = #{name => <<"stomp">>},
{201, _} = request(post, "/gateways", GwConf),
{204, _} = request(get, "/gateways/stomp/authentication"),
init_gw("stomp"),
AuthConf = #{
mechanism => <<"password_based">>,
backend => <<"built_in_database">>,
@ -561,7 +582,25 @@ t_authn_fuzzy_search(_) ->
{204, _} = request(delete, "/gateways/stomp/authentication"),
{204, _} = request(get, "/gateways/stomp/authentication"),
{204, _} = request(delete, "/gateways/stomp").
ok.
%%--------------------------------------------------------------------
%% Helpers
init_gw(GwName) ->
init_gw(GwName, #{}).
init_gw(GwName, GwConf) ->
{204, _} = request(put, "/gateways/" ++ GwName, GwConf),
?retry(
10,
10,
begin
{200, #{status := Status} = RespConf} = request(get, "/gateways/" ++ GwName),
false = (Status == <<"unloaded">>),
RespConf
end
).
%%--------------------------------------------------------------------
%% Asserts

View File

@ -94,7 +94,7 @@ maybe_unconvert_listeners(Conf) when is_map(Conf) ->
maybe_unconvert_listeners(Conf) ->
Conf.
assert_feilds_apperence(Ks, Map) ->
assert_fields_exist(Ks, Map) ->
lists:foreach(
fun(K) ->
_ = maps:get(K, Map)

View File

@ -25,7 +25,7 @@
-import(
emqx_gateway_test_utils,
[
assert_feilds_apperence/2,
assert_fields_exist/2,
request/2,
request/3
]
@ -730,7 +730,7 @@ t_rest_clienit_info(_) ->
binary_to_list(ClientId),
{200, StompClient1} = request(get, ClientPath),
?assertEqual(StompClient, StompClient1),
assert_feilds_apperence(
assert_fields_exist(
[
proto_name,
awaiting_rel_max,
@ -787,7 +787,7 @@ t_rest_clienit_info(_) ->
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs)),
assert_feilds_apperence([topic, qos], lists:nth(1, Subs)),
assert_fields_exist([topic, qos], lists:nth(1, Subs)),
{201, _} = request(
post,

View File

@ -2,7 +2,7 @@
{application, emqx_management, [
{description, "EMQX Management API and CLI"},
% strict semver, bump manually!
{vsn, "5.0.7"},
{vsn, "5.0.8"},
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx]},

View File

@ -584,13 +584,13 @@ authz_cache(delete, #{bindings := Bindings}) ->
clean_authz_cache(Bindings).
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
Opts = to_topic_info(TopicInfo),
subscribe(Opts#{clientid => ClientID}).
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics =
[
emqx_map_lib:unsafe_atom_key_map(TopicInfo)
to_topic_info(TopicInfo)
|| TopicInfo <- TopicInfos
],
subscribe_batch(#{clientid => ClientID, topics => Topics}).
@ -973,3 +973,7 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
result => AuthzResult,
updated_time => Timestamp
}.
to_topic_info(Data) ->
M = maps:with([<<"topic">>, <<"qos">>, <<"nl">>, <<"rap">>, <<"rh">>], Data),
emqx_map_lib:safe_atom_key_map(M).

View File

@ -268,7 +268,7 @@ config(put, #{body := Body}, Req) ->
global_zone_configs(get, _Params, _Req) ->
Paths = global_zone_roots(),
Zones = lists:foldl(
fun(Path, Acc) -> Acc#{Path => get_config_with_default([Path])} end,
fun(Path, Acc) -> maps:merge(Acc, get_config_with_default(Path)) end,
#{},
Paths
),
@ -343,7 +343,7 @@ get_full_config() ->
).
get_config_with_default(Path) ->
emqx_config:fill_defaults(emqx:get_raw_config(Path)).
emqx_config:fill_defaults(#{Path => emqx:get_raw_config([Path])}).
conf_path_from_querystr(Req) ->
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of

View File

@ -133,6 +133,18 @@ t_global_zone(_Config) ->
BadZones = emqx_map_lib:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 3),
?assertMatch({error, {"HTTP/1.1", 400, _}}, update_global_zone(BadZones)),
%% Remove max_qos_allowed from raw config, but we still get default value(2).
Mqtt0 = emqx_conf:get_raw([<<"mqtt">>]),
?assertEqual(1, emqx_map_lib:deep_get([<<"max_qos_allowed">>], Mqtt0)),
Mqtt1 = maps:remove(<<"max_qos_allowed">>, Mqtt0),
ok = emqx_config:put_raw([<<"mqtt">>], Mqtt1),
Mqtt2 = emqx_conf:get_raw([<<"mqtt">>]),
?assertNot(maps:is_key(<<"max_qos_allowed">>, Mqtt2), Mqtt2),
{ok, #{<<"mqtt">> := Mqtt3}} = get_global_zone(),
%% the default value is 2
?assertEqual(2, emqx_map_lib:deep_get([<<"max_qos_allowed">>], Mqtt3)),
ok = emqx_config:put_raw([<<"mqtt">>], Mqtt0),
ok.
get_global_zone() ->

View File

@ -37,8 +37,7 @@
}).
all() ->
[t_banned_delayed].
%% emqx_common_test_helpers:all(?MODULE).
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF, #{

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.6"},
{vsn, "5.0.7"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx]},

View File

@ -20,6 +20,7 @@
-include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
@ -286,7 +287,20 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
end.
do_deliver([Msg | T], Pid, Topic) ->
Pid ! {deliver, Topic, Msg},
case emqx_banned:look_up({clientid, Msg#message.from}) of
[] ->
Pid ! {deliver, Topic, Msg},
ok;
_ ->
?tp(
notice,
ignore_retained_message_deliver,
#{
reason => "client is banned",
clientid => Msg#message.from
}
)
end,
do_deliver(T, Pid, Topic);
do_deliver([], _, _) ->
ok.

View File

@ -639,6 +639,46 @@ test_disable_then_start(_Config) ->
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
ok.
t_deliver_when_banned(_) ->
Client1 = <<"c1">>,
Client2 = <<"c2">>,
{ok, C1} = emqtt:start_link([{clientid, Client1}, {clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
Topic = erlang:list_to_binary(io_lib:format("retained/~p", [I])),
Msg = emqx_message:make(Client2, 0, Topic, <<"this is a retained message">>),
Msg2 = emqx_message:set_flag(retain, Msg),
emqx:publish(Msg2)
end,
lists:seq(1, 3)
),
Now = erlang:system_time(second),
Who = {clientid, Client2},
emqx_banned:create(#{
who => Who,
by => <<"test">>,
reason => <<"test">>,
at => Now,
until => Now + 120
}),
timer:sleep(100),
snabbkaffe:start_trace(),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
timer:sleep(500),
Trace = snabbkaffe:collect_trace(),
?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
snabbkaffe:stop(),
emqx_banned:delete(Who),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>),
ok = emqtt:disconnect(C1).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -45,6 +45,12 @@ emqx_statsd_schema {
zh: """指标的推送间隔。"""
}
}
tags {
desc {
en: """The tags for metrics."""
zh: """指标的标签。"""
}
}
enable {
desc {

View File

@ -1,5 +1,2 @@
-define(APP, emqx_statsd).
-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000).
-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000).
-define(DEFAULT_HOST, "127.0.0.1").
-define(DEFAULT_PORT, 8125).
-define(STATSD, [statsd]).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_statsd, [
{description, "An OTP application"},
{vsn, "5.0.2"},
{description, "EMQX Statsd"},
{vsn, "5.0.3"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications, [

View File

@ -28,18 +28,17 @@
-include_lib("emqx/include/logger.hrl").
-export([
update/1,
start/0,
stop/0,
restart/0,
%% for rpc
%% for rpc: remove after 5.1.x
do_start/0,
do_stop/0,
do_restart/0
]).
%% Interface
-export([start_link/1]).
-export([start_link/0]).
%% Internal Exports
-export([
@ -51,40 +50,15 @@
terminate/2
]).
-record(state, {
timer :: reference() | undefined,
sample_time_interval :: pos_integer(),
flush_time_interval :: pos_integer(),
estatsd_pid :: pid()
}).
update(Config) ->
case
emqx_conf:update(
[statsd],
Config,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
ok = stop(),
case maps:get(<<"enable">>, Config, true) of
true ->
ok = restart();
false ->
ok = stop()
end,
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
-define(SAMPLE_TIMEOUT, sample_timeout).
%% Remove after 5.1.x
start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())).
stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())).
restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria_mnesia:running_nodes())).
do_start() ->
emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
emqx_statsd_sup:ensure_child_started(?APP).
do_stop() ->
emqx_statsd_sup:ensure_child_stopped(?APP).
@ -94,59 +68,51 @@ do_restart() ->
ok = do_start(),
ok.
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([Opts]) ->
init([]) ->
process_flag(trap_exit, true),
Tags = tags(maps:get(tags, Opts, #{})),
{Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
Opts1 = maps:without(
[
sample_time_interval,
flush_time_interval
],
Opts#{
tags => Tags,
host => Host,
port => Port,
prefix => <<"emqx">>
}
),
{ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
#{
tags := TagsRaw,
server := {Host, Port},
sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval
} = emqx_conf:get([statsd]),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
{ok, Pid} = estatsd:start_link(Opts),
{ok,
ensure_timer(#state{
sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid
ensure_timer(#{
sample_time_interval => SampleTimeInterval,
flush_time_interval => FlushTimeInterval,
estatsd_pid => Pid
})}.
handle_call(_Req, _From, State) ->
{noreply, State}.
{reply, ignore, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(
{timeout, Ref, sample_timeout},
State = #state{
sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid,
timer = Ref
{timeout, Ref, ?SAMPLE_TIMEOUT},
State = #{
sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval,
estatsd_pid := Pid,
timer := Ref
}
) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [
{gauge, trans_metrics_name(Name), Value, SampleRate, []}
{gauge, Name, Value, SampleRate, []}
|| {Name, Value} <- Metrics
],
estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State)};
handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) ->
ok = estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State), hibernate};
handle_info({'EXIT', Pid, Error}, State = #{estatsd_pid := Pid}) ->
{stop, {shutdown, Error}, State};
handle_info(_Msg, State) ->
{noreply, State}.
@ -154,16 +120,13 @@ handle_info(_Msg, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{estatsd_pid = Pid}) ->
terminate(_Reason, #{estatsd_pid := Pid}) ->
estatsd:stop(Pid),
ok.
%%------------------------------------------------------------------------------
%% Internal function
%%------------------------------------------------------------------------------
trans_metrics_name(Name) ->
Name0 = atom_to_binary(Name, utf8),
binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
emqx_vm_data() ->
Idle =
@ -179,12 +142,8 @@ emqx_vm_data() ->
{cpu_use, 100 - Idle}
] ++ emqx_vm:mem_info().
tags(Map) ->
Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
ensure_timer(State = #state{sample_time_interval = SampleTimeInterval}) ->
State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}.
ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
check_multicall_result({Results, []}) ->
case
@ -201,3 +160,8 @@ check_multicall_result({Results, []}) ->
end;
check_multicall_result({_, _}) ->
error(multicall_failed).
to_bin(B) when is_binary(B) -> B;
to_bin(I) when is_integer(I) -> integer_to_binary(I);
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -77,15 +77,16 @@ statsd_config_schema() ->
statsd_example() ->
#{
enable => true,
flush_time_interval => "32s",
sample_time_interval => "32s",
server => "127.0.0.1:8125"
flush_time_interval => "30s",
sample_time_interval => "30s",
server => "127.0.0.1:8125",
tags => #{}
}.
statsd(get, _Params) ->
{200, emqx:get_raw_config([<<"statsd">>], #{})};
statsd(put, #{body := Body}) ->
case emqx_statsd:update(Body) of
case emqx_statsd_config:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->

View File

@ -27,15 +27,8 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_statsd_sup:start_link(),
maybe_enable_statsd(),
emqx_statsd_config:add_handler(),
{ok, Sup}.
stop(_) ->
emqx_statsd_config:remove_handler(),
ok.
maybe_enable_statsd() ->
case emqx_conf:get([statsd, enable], false) of
true ->
emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{}));
false ->
ok
end.

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_config).
-behaviour(emqx_config_handler).
-include("emqx_statsd.hrl").
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
update(Config) ->
case
emqx_conf:update(
?STATSD,
Config,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
add_handler() ->
ok = emqx_config_handler:add_handler(?STATSD, ?MODULE),
ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?STATSD),
ok.
post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP),
emqx_statsd_sup:ensure_child_started(?APP);
post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP);
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok.

View File

@ -25,7 +25,8 @@
namespace/0,
roots/0,
fields/1,
desc/1
desc/1,
validations/0
]).
namespace() -> "statsd".
@ -45,7 +46,8 @@ fields("statsd") ->
)},
{server, fun server/1},
{sample_time_interval, fun sample_interval/1},
{flush_time_interval, fun flush_interval/1}
{flush_time_interval, fun flush_interval/1},
{tags, fun tags/1}
].
desc("statsd") -> ?DESC(statsd);
@ -59,12 +61,37 @@ server(_) -> undefined.
sample_interval(type) -> emqx_schema:duration_ms();
sample_interval(required) -> true;
sample_interval(default) -> "10s";
sample_interval(default) -> "30s";
sample_interval(desc) -> ?DESC(?FUNCTION_NAME);
sample_interval(_) -> undefined.
flush_interval(type) -> emqx_schema:duration_ms();
flush_interval(required) -> true;
flush_interval(default) -> "10s";
flush_interval(default) -> "30s";
flush_interval(desc) -> ?DESC(?FUNCTION_NAME);
flush_interval(_) -> undefined.
tags(type) -> map();
tags(required) -> false;
tags(default) -> #{};
tags(desc) -> ?DESC(?FUNCTION_NAME);
tags(_) -> undefined.
validations() ->
[
{check_interval, fun check_interval/1}
].
check_interval(Conf) ->
case hocon_maps:get("statsd.sample_time_interval", Conf) of
undefined ->
ok;
Sample ->
Flush = hocon_maps:get("statsd.flush_time_interval", Conf),
case Sample =< Flush of
true ->
true;
false ->
{bad_interval, #{sample_time_interval => Sample, flush_time_interval => Flush}}
end
end.

View File

@ -10,7 +10,6 @@
-export([
start_link/0,
ensure_child_started/1,
ensure_child_started/2,
ensure_child_stopped/1
]).
@ -19,7 +18,7 @@
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{
id => Mod,
start => {Mod, start_link, [Opts]},
start => {Mod, start_link, Opts},
restart => permanent,
shutdown => 5000,
type => worker,
@ -29,13 +28,9 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec ensure_child_started(supervisor:child_spec()) -> ok.
ensure_child_started(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec ensure_child_started(atom(), map()) -> ok.
ensure_child_started(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
-spec ensure_child_started(atom()) -> ok.
ensure_child_started(Mod) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))).
%% @doc Stop the child worker process.
-spec ensure_child_stopped(any()) -> ok.
@ -50,13 +45,17 @@ ensure_child_stopped(ChildId) ->
end.
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.
Children =
case emqx_conf:get([statsd, enable], false) of
true -> [?CHILD(emqx_statsd, [])];
false -> []
end,
{ok, {{one_for_one, 100, 3600}, Children}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -5,28 +5,104 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-define(BASE_CONF, <<
"\n"
"statsd {\n"
"enable = true\n"
"flush_time_interval = 4s\n"
"sample_time_interval = 4s\n"
"server = \"127.0.0.1:8126\"\n"
"tags {\"t1\" = \"good\", test = 100}\n"
"}\n"
>>).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_statsd]),
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_dashboard, emqx_statsd],
fun set_special_configs/1
),
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{
raw_with_default => true
}),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_statsd]).
emqx_common_test_helpers:stop_apps([emqx_statsd, emqx_dashboard, emqx_conf]).
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_) ->
ok.
all() ->
emqx_common_test_helpers:all(?MODULE).
t_statsd(_) ->
{ok, Socket} = gen_udp:open(8125),
{ok, Socket} = gen_udp:open(8126, [{active, true}]),
receive
{udp, _Socket, _Host, _Port, Bin} ->
?assert(length(Bin) > 50)
after 11 * 1000 ->
?assert(true, failed)
{udp, Socket1, Host, Port, Data} ->
ct:pal("receive:~p~n", [{Socket, Socket1, Host, Port}]),
?assert(length(Data) > 50),
?assert(nomatch =/= string:find(Data, "\nemqx.cpu_use:"))
after 10 * 1000 ->
error(timeout)
end,
gen_udp:close(Socket).
t_management(_) ->
?assertMatch(ok, emqx_statsd:start()),
?assertMatch(ok, emqx_statsd:start()),
?assertMatch(ok, emqx_statsd:stop()),
?assertMatch(ok, emqx_statsd:stop()),
?assertMatch(ok, emqx_statsd:restart()).
t_rest_http(_) ->
{ok, Res0} = request(get),
?assertEqual(
#{
<<"enable">> => true,
<<"flush_time_interval">> => <<"4s">>,
<<"sample_time_interval">> => <<"4s">>,
<<"server">> => <<"127.0.0.1:8126">>,
<<"tags">> => #{<<"t1">> => <<"good">>, <<"test">> => 100}
},
Res0
),
{ok, Res1} = request(put, #{enable => false}),
?assertMatch(#{<<"enable">> := false}, Res1),
?assertEqual(maps:remove(<<"enable">>, Res0), maps:remove(<<"enable">>, Res1)),
{ok, Res2} = request(get),
?assertEqual(Res1, Res2),
?assertEqual(
error, request(put, #{sample_time_interval => "11s", flush_time_interval => "10s"})
),
{ok, _} = request(put, #{enable => true}),
ok.
t_kill_exit(_) ->
{ok, _} = request(put, #{enable => true}),
Pid = erlang:whereis(emqx_statsd),
?assertEqual(ignore, gen_server:call(Pid, whatever)),
?assertEqual(ok, gen_server:cast(Pid, whatever)),
?assertEqual(Pid, erlang:whereis(emqx_statsd)),
#{estatsd_pid := Estatsd} = sys:get_state(emqx_statsd),
?assert(erlang:exit(Estatsd, kill)),
?assertEqual(false, is_process_alive(Estatsd)),
ct:sleep(150),
Pid1 = erlang:whereis(emqx_statsd),
?assertNotEqual(Pid, Pid1),
#{estatsd_pid := Estatsd1} = sys:get_state(emqx_statsd),
?assertNotEqual(Estatsd, Estatsd1),
ok.
request(Method) -> request(Method, []).
request(Method, Body) ->
case request(Method, uri(["statsd"]), Body) of
{ok, 200, Res} ->
{ok, emqx_json:decode(Res, [return_maps])};
{ok, _Status, _} ->
error
end.

View File

@ -396,7 +396,7 @@ remsh() {
# Generate a random id
relx_gen_id() {
od -t x -N 4 /dev/urandom | head -n1 | awk '{print $2}'
od -t u -N 4 /dev/urandom | head -n1 | awk '{print $2 % 1000}'
}
call_nodetool() {

View File

@ -226,9 +226,14 @@ nodename(Name) ->
this_node_name(longnames, Name) ->
[Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
list_to_atom(lists:concat(["remsh_maint_", Node, os:getpid(), "@", Host]));
list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host]));
this_node_name(shortnames, Name) ->
list_to_atom(lists:concat(["remsh_maint_", Name, os:getpid()])).
list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])).
%% use the reversed value that from pid mod 1000 as the node name suffix
node_name_suffix_id() ->
Pid = os:getpid(),
string:slice(string:reverse(Pid), 0, 3).
%% For windows???
create_mnesia_dir(DataDir, NodeName) ->

View File

@ -2,7 +2,34 @@
## Enhancements
- Security enhancement for retained messages [#9326](https://github.com/emqx/emqx/pull/9326).
The retained messages will not be published if the publisher client is banned.
- Security enhancement for the `subscribe` API [#9355](https://github.com/emqx/emqx/pull/9355).
- Enhance the `banned` feature [#9367](https://github.com/emqx/emqx/pull/9367).
Now the corresponding session will be kicked when client is banned by `clientid`.
- Redesign `/gateways` API [9364](https://github.com/emqx/emqx/pull/9364).
Use `PUT /gateways/{name}` instead of `POST /gateways`, gateway gets 'loaded'
automatically if needed. Use `PUT /gateways/{name}/enable/{true|false}` to
enable or disable gateway. No more `DELETE /gateways/{name}`.
- Support `statsd {tags: {"user-defined-tag" = "tag-value"}` configure and improve stability of `emqx_statsd` [#9363](http://github.com/emqx/emqx/pull/9363).
- Improve node name generation rules to avoid potential atom table overflow risk [#9387](https://github.com/emqx/emqx/pull/9387).
## Bug fixes
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
- Fix create trace sometime failed by end_at time has already passed. [#9303](https://github.com/emqx/emqx/pull/9303)
- Return 404 for status of unknown authenticator in `/authenticator/{id}/status` [#9328](https://github.com/emqx/emqx/pull/9328).
- Fix that JWT ACL rules are only applied if an `exp` claim is set [#9368](https://github.com/emqx/emqx/pull/9368).
- Fix that `/configs/global_zone` API cannot get the default value of the configuration [#9392](https://github.com/emqx/emqx/pull/9392).
- Fix mountpoint not working for will-msg [#9399](https://github.com/emqx/emqx/pull/9399).

View File

@ -2,7 +2,31 @@
## 增强
- 增强 `保留消息` 的安全性 [#9332](https://github.com/emqx/emqx/pull/9332)。
现在投递保留消息前,会先过滤掉来源客户端被封禁了的那些消息。
- 增强订阅 API 的安全性 [#9355](https://github.com/emqx/emqx/pull/9355)。
- 增加 `封禁` 功能 [#9367](https://github.com/emqx/emqx/pull/9367)。
现在客户端通过 `clientid` 被封禁时将会踢掉对应的会话。
- 重新设计了 /gateways API [9364](https://github.com/emqx/emqx/pull/9364)。
使用 PUT /gateways/{name} 代替了 POST /gateways现在网关将在需要时自动加载然后删除了 DELETE /gateways/{name},之后可以使用 PUT /gateways/{name}/enable/{true|false} 来开启或禁用网关。
- 支持 `statsd {tags: {"user-defined-tag" = "tag-value"}` 配置,并提升 `emqx_statsd` 的稳定性 [#9363](http://github.com/emqx/emqx/pull/9363)。
- 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9387](https://github.com/emqx/emqx/pull/9387)。
## 修复
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
- 修复创建追踪日志时偶尔会报`end_at time has already passed`错误,导致创建失败。[#9303](https://github.com/emqx/emqx/pull/9303)
- 通过 `/authenticator/{id}/status` 请求未知认证器的状态时,将会返回 404。
- 修复 JWT ACL 规则只在设置了超期时间时才生效的问题 [#9368](https://github.com/emqx/emqx/pull/9368)。
- 修复 `/configs/global_zone` API 无法正确获取配置的默认值问题 [#9392](https://github.com/emqx/emqx/pull/9392)。
- 修复 mountpoint 配置未对遗嘱消息生效的问题 [#9399](https://github.com/emqx/emqx/pull/9399)

View File

@ -11,11 +11,10 @@ help() {
echo "$0 PROFILE [options]"
echo
echo "-h|--help: To display this usage information"
echo "--default: Print default vsn number. e.g. e.g. 5.0.0-ubuntu20.04-amd64"
echo "--long: Print long vsn number. e.g. 5.0.0-otp24.2.1-1-ubuntu20.04-amd64"
echo "--long: Print long vsn number. e.g. 5.0.0-ubuntu20.04-amd64"
echo " Otherwise short e.g. 5.0.0"
echo "--elixir: Include elixir version in the long version string"
echo " e.g. 5.0.0-elixir1.13.4-otp24.2.1-1-ubuntu20.04-amd64"
echo " e.g. 5.0.0-elixir-ubuntu20.04-amd64"
echo "--vsn_matcher: For --long option, replace the EMQX version with '*'"
echo " so it can be used in find commands"
}
@ -34,10 +33,6 @@ while [ "$#" -gt 0 ]; do
help
exit 0
;;
--default)
IS_DEFAULT_RELEASE='yes'
shift 1
;;
--long)
LONG_VERSION='yes'
shift 1
@ -123,19 +118,8 @@ if [ "${IS_MATCHER:-}" = 'yes' ]; then
PKG_VSN='*'
fi
OTP_VSN="${OTP_VSN:-$(./scripts/get-otp-vsn.sh)}"
SYSTEM="$(./scripts/get-distro.sh)"
case "$SYSTEM" in
windows*)
# directly build the default package for windows
IS_DEFAULT_RELEASE='yes'
;;
*)
true
;;
esac
UNAME_M="$(uname -m)"
case "$UNAME_M" in
x86_64)
@ -149,15 +133,10 @@ case "$UNAME_M" in
;;
esac
if [ "${IS_DEFAULT_RELEASE:-not-default-release}" = 'yes' ]; then
# when it's the default release, we do not add elixir or otp version
infix=''
if [ "${IS_ELIXIR:-}" = "yes" ]; then
infix='-elixir'
else
infix="-otp${OTP_VSN}"
if [ "${IS_ELIXIR:-}" = "yes" ]; then
ELIXIR_VSN="${ELIXIR_VSN:-$(./scripts/get-elixir-vsn.sh)}"
infix="-elixir${ELIXIR_VSN}${infix}"
fi
infix=''
fi
echo "${PKG_VSN}${infix}-${SYSTEM}-${ARCH}"

View File

@ -42,14 +42,29 @@ for keychain in ${keychains}; do
done
security -v list-keychains -s "${keychain_names[@]}" "${KEYCHAIN}"
# sign
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/erts-*/bin/{beam.smp,dyn_erl,epmd,erl,erl_call,erl_child_setup,erlexec,escript,heart,inet_gethost,run_erl,to_erl}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/asn1-*/priv/lib/asn1rt_nif.so
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/bcrypt-*/priv/bcrypt_nif.so
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/crypto-*/priv/lib/{crypto.so,otp_test_engine.so}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/jiffy-*/priv/jiffy.so
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/jq-*/priv/{jq_nif1.so,libjq.1.dylib,libonig.4.dylib,erlang_jq_port}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/os_mon-*/priv/bin/{cpu_sup,memsup}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/rocksdb-*/priv/liberocksdb.so
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime "${REL_DIR}"/lib/runtime_tools-*/priv/lib/{dyntrace.so,trace_ip_drv.so,trace_file_drv.so}
find "${REL_DIR}/lib/" -name libquicer_nif.so -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \;
# known runtime executables and binaries
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
"${REL_DIR}"/erts-*/bin/{beam.smp,dyn_erl,epmd,erl,erl_call,erl_child_setup,erlexec,escript,heart,inet_gethost,run_erl,to_erl}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
"${REL_DIR}"/lib/runtime_tools-*/priv/lib/{dyntrace.so,trace_ip_drv.so,trace_file_drv.so}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
"${REL_DIR}"/lib/os_mon-*/priv/bin/{cpu_sup,memsup}
codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime \
"${REL_DIR}"/lib/jq-*/priv/{jq_nif1.so,libjq.1.dylib,libonig.4.dylib,erlang_jq_port}
# other files from runtime and dependencies
for f in \
asn1rt_nif.so \
bcrypt_nif.so \
crc32cer_nif.so \
crypto.so \
crypto_callback.so \
jiffy.so \
liberocksdb.so \
libquicer_nif.so \
odbcserver \
otp_test_engine.so \
sasl_auth.so \
snappyer.so \
; do
find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \;
done