Merge remote-tracking branch 'origin/main-v4.3' into merge-v43-to-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-04-23 10:07:25 +02:00
commit 723f294119
25 changed files with 567 additions and 468 deletions

View File

@ -10,6 +10,14 @@ File format:
- One list item per change topic - One list item per change topic
Change log ends with a list of github PRs Change log ends with a list of github PRs
## v4.3.15
### Enhancements
* Made possible for EMQX to boot from a Linux directory which has white spaces in its path.
* Add support for JWT authorization [#7596]
Now MQTT clients may be authorized with respect to a specific claim containing publish/subscribe topic whitelists.
## v4.3.14 ## v4.3.14
### Enhancements ### Enhancements

View File

@ -46,6 +46,11 @@ auth.jwt.verify_claims = off
## - %u: username ## - %u: username
## - %c: clientid ## - %c: clientid
# auth.jwt.verify_claims.username = %u # auth.jwt.verify_claims.username = %u
## Name of the claim containg ACL rules
##
## Value: String
#auth.jwt.acl_claim_name = acl
``` ```
Load the Plugin Load the Plugin
@ -62,6 +67,33 @@ Example
mosquitto_pub -t 'pub' -m 'hello' -i test -u test -P eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJuYW1lIjoiYm9iIiwiYWdlIjoyOX0.bIV_ZQ8D5nQi0LT8AVkpM4Pd6wmlbpR9S8nOLJAsA8o mosquitto_pub -t 'pub' -m 'hello' -i test -u test -P eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJuYW1lIjoiYm9iIiwiYWdlIjoyOX0.bIV_ZQ8D5nQi0LT8AVkpM4Pd6wmlbpR9S8nOLJAsA8o
``` ```
ACL
---
JWT may contain lists of topics allowed for subscribing/publishing (ACL rules):
Payload example:
```json
{
"sub": "emqx",
"name": "John Doe",
"iat": 1516239022,
"exp": 1516239122,
"acl": {
"sub": [
"a/b",
"c/+",
"%u/%c"
],
"pub": [
"a/b",
"c/+",
"%u/%c"
]
}
}
```
Algorithms Algorithms
---------- ----------

View File

@ -47,3 +47,8 @@ auth.jwt.verify_claims = off
## For example, to verify that the username in the JWT payload is the same ## For example, to verify that the username in the JWT payload is the same
## as the client (MQTT protocol) username ## as the client (MQTT protocol) username
#auth.jwt.verify_claims.username = %u #auth.jwt.verify_claims.username = %u
## Name of the claim containg ACL rules
##
## Value: String
#auth.jwt.acl_claim_name = acl

View File

@ -47,3 +47,12 @@
end, [], cuttlefish_variable:filter_by_prefix("auth.jwt.verify_claims", Conf)) end, [], cuttlefish_variable:filter_by_prefix("auth.jwt.verify_claims", Conf))
end end
end}. end}.
{mapping, "auth.jwt.acl_claim_name", "emqx_auth_jwt.acl_claim_name", [
{default, "acl"},
{datatype, string}
]}.
{translation, "emqx_auth_jwt.acl_claim_name", fun(Conf) ->
list_to_binary(cuttlefish:conf_get("auth.jwt.acl_claim_name", Conf))
end}.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_jwt, {application, emqx_auth_jwt,
[{description, "EMQ X Authentication with JWT"}, [{description, "EMQ X Authentication with JWT"},
{vsn, "4.4.1"}, % strict semver, bump manually! {vsn, "4.4.2"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_jwt_sup]}, {registered, [emqx_auth_jwt_sup]},
{applications, [kernel,stdlib,jose]}, {applications, [kernel,stdlib,jose]},

View File

@ -1,14 +1,15 @@
%% -*-: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[ [
{<<"4\\.4\\.0">>, [ {<<"4\\.4\\.[0-1]">>, [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {restart_application,emqx_auth_jwt}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ [
{<<"4\\.4\\.0">>, [ {<<"4\\.4\\.[0-1]">>, [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {restart_application,emqx_auth_jwt}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
] ]

View File

@ -22,7 +22,8 @@
-logger_header("[JWT]"). -logger_header("[JWT]").
-export([ register_metrics/0 -export([ register_metrics/0
, check/3 , check_auth/3
, check_acl/5
, description/0 , description/0
]). ]).
@ -46,16 +47,14 @@ register_metrics() ->
%% Authentication callbacks %% Authentication callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check(ClientInfo, AuthResult, #{pid := Pid, check_auth(ClientInfo, AuthResult, #{from := From, checklists := Checklists}) ->
from := From,
checklists := Checklists}) ->
case maps:find(From, ClientInfo) of case maps:find(From, ClientInfo) of
error -> error ->
ok = emqx_metrics:inc(?AUTH_METRICS(ignore)); ok = emqx_metrics:inc(?AUTH_METRICS(ignore));
{ok, undefined} -> {ok, undefined} ->
ok = emqx_metrics:inc(?AUTH_METRICS(ignore)); ok = emqx_metrics:inc(?AUTH_METRICS(ignore));
{ok, Token} -> {ok, Token} ->
case emqx_auth_jwt_svr:verify(Pid, Token) of case emqx_auth_jwt_svr:verify(Token) of
{error, not_found} -> {error, not_found} ->
ok = emqx_metrics:inc(?AUTH_METRICS(ignore)); ok = emqx_metrics:inc(?AUTH_METRICS(ignore));
{error, not_token} -> {error, not_token} ->
@ -68,12 +67,38 @@ check(ClientInfo, AuthResult, #{pid := Pid,
end end
end. end.
check_acl(ClientInfo = #{jwt_claims := Claims},
PubSub,
Topic,
_NoMatchAction,
#{acl_claim_name := AclClaimName}) ->
Deadline = erlang:system_time(second),
case Claims of
#{AclClaimName := Acl, <<"exp">> := Exp}
when is_integer(Exp) andalso Exp >= Deadline ->
verify_acl(ClientInfo, Acl, PubSub, Topic);
_ -> ignore
end.
description() -> "Authentication with JWT". description() -> "Authentication with JWT".
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Verify Claims %% Verify Claims
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
verify_acl(ClientInfo, #{<<"sub">> := SubTopics}, subscribe, Topic) when is_list(SubTopics) ->
verify_acl(ClientInfo, SubTopics, Topic);
verify_acl(ClientInfo, #{<<"pub">> := PubTopics}, publish, Topic) when is_list(PubTopics) ->
verify_acl(ClientInfo, PubTopics, Topic);
verify_acl(_ClientInfo, _Acl, _PubSub, _Topic) -> {stop, deny}.
verify_acl(_ClientInfo, [], _Topic) -> {stop, deny};
verify_acl(ClientInfo, [AclTopic | AclTopics], Topic) ->
case match_topic(ClientInfo, AclTopic, Topic) of
true -> {stop, allow};
false -> verify_acl(ClientInfo, AclTopics, Topic)
end.
verify_claims(Checklists, Claims, ClientInfo) -> verify_claims(Checklists, Claims, ClientInfo) ->
case do_verify_claims(feedvar(Checklists, ClientInfo), Claims) of case do_verify_claims(feedvar(Checklists, ClientInfo), Claims) of
{error, Reason} -> {error, Reason} ->
@ -97,3 +122,9 @@ feedvar(Checklists, #{username := Username, clientid := ClientId}) ->
({K, <<"%c">>}) -> {K, ClientId}; ({K, <<"%c">>}) -> {K, ClientId};
({K, Expected}) -> {K, Expected} ({K, Expected}) -> {K, Expected}
end, Checklists). end, Checklists).
match_topic(ClientInfo, AclTopic, Topic) ->
AclTopicWords = emqx_topic:words(AclTopic),
TopicWords = emqx_topic:words(Topic),
AclTopicRendered = emqx_access_rule:feed_var(ClientInfo, AclTopicWords),
emqx_topic:match(TopicWords, AclTopicRendered).

View File

@ -31,16 +31,20 @@
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), {ok, Sup} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
{ok, Pid} = start_auth_server(jwks_svr_options()), {ok, _} = start_auth_server(jwks_svr_options()),
ok = emqx_auth_jwt:register_metrics(), ok = emqx_auth_jwt:register_metrics(),
AuthEnv0 = auth_env(),
AuthEnv1 = AuthEnv0#{pid => Pid},
_ = emqx:hook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv1]}), AuthEnv = auth_env(),
{ok, Sup, AuthEnv1}. _ = emqx:hook('client.authenticate', {emqx_auth_jwt, check_auth, [AuthEnv]}),
stop(AuthEnv) -> AclEnv = acl_env(),
emqx:unhook('client.authenticate', {emqx_auth_jwt, check, [AuthEnv]}). _ = emqx:hook('client.check_acl', {emqx_auth_jwt, check_acl, [AclEnv]}),
{ok, Sup}.
stop(_State) ->
emqx:unhook('client.authenticate', {emqx_auth_jwt, check_auth}),
emqx:unhook('client.check_acl', {emqx_auth_jwt, check_acl}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Dummy supervisor %% Dummy supervisor
@ -69,6 +73,9 @@ auth_env() ->
, checklists => Checklists , checklists => Checklists
}. }.
acl_env() ->
#{acl_claim_name => env(acl_claim_name, <<"acl">>)}.
jwks_svr_options() -> jwks_svr_options() ->
[{K, V} || {K, V} [{K, V} || {K, V}
<- [{secret, env(secret, undefined)}, <- [{secret, env(secret, undefined)},

View File

@ -26,7 +26,7 @@
%% APIs %% APIs
-export([start_link/1]). -export([start_link/1]).
-export([verify/2]). -export([verify/1]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -44,8 +44,9 @@
| {interval, pos_integer()}. | {interval, pos_integer()}.
-define(INTERVAL, 300000). -define(INTERVAL, 300000).
-define(TAB, ?MODULE).
-record(state, {static, remote, addr, tref, intv}). -record(state, {addr, tref, intv}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
@ -55,13 +56,13 @@
start_link(Options) -> start_link(Options) ->
gen_server:start_link(?MODULE, [Options], []). gen_server:start_link(?MODULE, [Options], []).
-spec verify(pid(), binary()) -spec verify(binary())
-> {error, term()} -> {error, term()}
| {ok, Payload :: map()}. | {ok, Payload :: map()}.
verify(S, JwsCompacted) when is_binary(JwsCompacted) -> verify(JwsCompacted) when is_binary(JwsCompacted) ->
case catch jose_jws:peek(JwsCompacted) of case catch jose_jws:peek(JwsCompacted) of
{'EXIT', _} -> {error, not_token}; {'EXIT', _} -> {error, not_token};
_ -> gen_server:call(S, {verify, JwsCompacted}) _ -> do_verify(JwsCompacted)
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -70,12 +71,12 @@ verify(S, JwsCompacted) when is_binary(JwsCompacted) ->
init([Options]) -> init([Options]) ->
ok = jose:json_module(jiffy), ok = jose:json_module(jiffy),
_ = ets:new(?TAB, [set, protected, named_table]),
{Static, Remote} = do_init_jwks(Options), {Static, Remote} = do_init_jwks(Options),
true = ets:insert(?TAB, [{static, Static}, {remote, Remote}]),
Intv = proplists:get_value(interval, Options, ?INTERVAL), Intv = proplists:get_value(interval, Options, ?INTERVAL),
{ok, reset_timer( {ok, reset_timer(
#state{ #state{
static = Static,
remote = Remote,
addr = proplists:get_value(jwks_addr, Options), addr = proplists:get_value(jwks_addr, Options),
intv = Intv})}. intv = Intv})}.
@ -105,9 +106,6 @@ do_init_jwks(Options) ->
Remote = K2J(jwks_addr, fun request_jwks/1), Remote = K2J(jwks_addr, fun request_jwks/1),
{[J ||J <- [OctJwk, PemJwk], J /= undefined], Remote}. {[J ||J <- [OctJwk, PemJwk], J /= undefined], Remote}.
handle_call({verify, JwsCompacted}, _From, State) ->
handle_verify(JwsCompacted, State);
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
@ -116,7 +114,7 @@ handle_cast(_Msg, State) ->
handle_info({timeout, _TRef, refresh}, State = #state{addr = Addr}) -> handle_info({timeout, _TRef, refresh}, State = #state{addr = Addr}) ->
NState = try NState = try
State#state{remote = request_jwks(Addr)} true = ets:insert(?TAB, {remote, request_jwks(Addr)})
catch _:_ -> catch _:_ ->
State State
end, end,
@ -136,24 +134,10 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_verify(JwsCompacted, keys(Type) ->
State = #state{static = Static, remote = Remote}) -> case ets:lookup(?TAB, Type) of
try [{_, Keys}] -> Keys;
Jwks = case emqx_json:decode(jose_jws:peek_protected(JwsCompacted), [return_maps]) of [] -> []
#{<<"kid">> := Kid} when Remote /= undefined ->
[J || J <- Remote, maps:get(<<"kid">>, J#jose_jwk.fields, undefined) =:= Kid];
_ -> Static
end,
case Jwks of
[] -> {reply, {error, not_found}, State};
_ ->
{reply, do_verify(JwsCompacted, Jwks), State}
end
catch
Class : Reason : Stk ->
?LOG(error, "Handle JWK crashed: ~p, ~p, stacktrace: ~p~n",
[Class, Reason, Stk]),
{reply, {error, invalid_signature}, State}
end. end.
request_jwks(Addr) -> request_jwks(Addr) ->
@ -181,6 +165,26 @@ cancel_timer(State = #state{tref = TRef}) ->
_ = erlang:cancel_timer(TRef), _ = erlang:cancel_timer(TRef),
State#state{tref = undefined}. State#state{tref = undefined}.
do_verify(JwsCompacted) ->
try
Remote = keys(remote),
Jwks = case emqx_json:decode(jose_jws:peek_protected(JwsCompacted), [return_maps]) of
#{<<"kid">> := Kid} when Remote /= undefined ->
[J || J <- Remote, maps:get(<<"kid">>, J#jose_jwk.fields, undefined) =:= Kid];
_ -> keys(static)
end,
case Jwks of
[] -> {error, not_found};
_ ->
do_verify(JwsCompacted, Jwks)
end
catch
Class : Reason : Stk ->
?LOG(error, "verify JWK crashed: ~p, ~p, stacktrace: ~p~n",
[Class, Reason, Stk]),
{error, invalid_signature}
end.
do_verify(_JwsCompated, []) -> do_verify(_JwsCompated, []) ->
{error, invalid_signature}; {error, invalid_signature};
do_verify(JwsCompacted, [Jwk|More]) -> do_verify(JwsCompacted, [Jwk|More]) ->
@ -214,11 +218,12 @@ check_claims(Claims) ->
do_check_claim([], Claims) -> do_check_claim([], Claims) ->
Claims; Claims;
do_check_claim([{K, F}|More], Claims) -> do_check_claim([{K, F}|More], Claims) ->
case maps:take(K, Claims) of case Claims of
error -> do_check_claim(More, Claims); #{K := V} ->
{V, NClaims} ->
case F(V) of case F(V) of
true -> do_check_claim(More, NClaims); true -> do_check_claim(More, Claims);
_ -> {false, K} _ -> {false, K}
end end;
_ ->
do_check_claim(More, Claims)
end. end.

View File

@ -19,29 +19,18 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(APP, emqx_auth_jwt). all() -> emqx_ct:all(?MODULE).
all() -> init_per_testcase(TestCase, Config) ->
[{group, emqx_auth_jwt}]. ?MODULE:TestCase(init, Config),
groups() ->
[{emqx_auth_jwt, [sequence], [ t_check_auth
, t_check_claims
, t_check_claims_clientid
, t_check_claims_username
, t_check_claims_kid_in_header
]}
].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_auth_jwt], fun set_special_configs/1), emqx_ct_helpers:start_apps([emqx_auth_jwt], fun set_special_configs/1),
Config. Config.
end_per_suite(_Config) -> end_per_testcase(_Case, _Config) ->
emqx_ct_helpers:stop_apps([emqx_auth_jwt]). emqx_ct_helpers:stop_apps([emqx_auth_jwt]).
set_special_configs(emqx) -> set_special_configs(emqx) ->
@ -78,7 +67,9 @@ sign(Payload, Alg, Key) ->
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_check_auth(_) -> t_check_auth(init, _Config) ->
application:unset_env(emqx_auth_jwt, verify_claims).
t_check_auth(_Config) ->
Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external}, Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
Jwt = sign([{clientid, <<"client1">>}, Jwt = sign([{clientid, <<"client1">>},
{username, <<"plain">>}, {username, <<"plain">>},
@ -102,10 +93,9 @@ t_check_auth(_) ->
?assertEqual({error, invalid_signature}, Result2), ?assertEqual({error, invalid_signature}, Result2),
?assertMatch({error, _}, emqx_access_control:authenticate(Plain#{password => <<"asd">>})). ?assertMatch({error, _}, emqx_access_control:authenticate(Plain#{password => <<"asd">>})).
t_check_claims(_) -> t_check_claims(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]), application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), t_check_claims(_Config) ->
Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external}, Plain = #{clientid => <<"client1">>, username => <<"plain">>, zone => external},
Jwt = sign([{client_id, <<"client1">>}, Jwt = sign([{client_id, <<"client1">>},
{username, <<"plain">>}, {username, <<"plain">>},
@ -120,9 +110,9 @@ t_check_claims(_) ->
ct:pal("Auth result for the invalid jwt: ~p~n", [Result2]), ct:pal("Auth result for the invalid jwt: ~p~n", [Result2]),
?assertEqual({error, invalid_signature}, Result2). ?assertEqual({error, invalid_signature}, Result2).
t_check_claims_clientid(_) -> t_check_claims_clientid(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{clientid, <<"%c">>}]), application:set_env(emqx_auth_jwt, verify_claims, [{clientid, <<"%c">>}]).
application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), t_check_claims_clientid(_Config) ->
Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
Jwt = sign([{clientid, <<"client23">>}, Jwt = sign([{clientid, <<"client23">>},
{username, <<"plain">>}, {username, <<"plain">>},
@ -136,10 +126,9 @@ t_check_claims_clientid(_) ->
ct:pal("Auth result for the invalid jwt: ~p~n", [Result2]), ct:pal("Auth result for the invalid jwt: ~p~n", [Result2]),
?assertEqual({error, invalid_signature}, Result2). ?assertEqual({error, invalid_signature}, Result2).
t_check_claims_username(_) -> t_check_claims_username(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{username, <<"%u">>}]), application:set_env(emqx_auth_jwt, verify_claims, [{username, <<"%u">>}]).
application:stop(emqx_auth_jwt), application:start(emqx_auth_jwt), t_check_claims_username(_Config) ->
Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
Jwt = sign([{client_id, <<"client23">>}, Jwt = sign([{client_id, <<"client23">>},
{username, <<"plain">>}, {username, <<"plain">>},
@ -153,8 +142,9 @@ t_check_claims_username(_) ->
ct:pal("Auth result for the invalid jwt: ~p~n", [Result3]), ct:pal("Auth result for the invalid jwt: ~p~n", [Result3]),
?assertEqual({error, invalid_signature}, Result3). ?assertEqual({error, invalid_signature}, Result3).
t_check_claims_kid_in_header(_) -> t_check_claims_kid_in_header(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, []), application:set_env(emqx_auth_jwt, verify_claims, []).
t_check_claims_kid_in_header(_Config) ->
Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external}, Plain = #{clientid => <<"client23">>, username => <<"plain">>, zone => external},
Jwt = sign([{clientid, <<"client23">>}, Jwt = sign([{clientid, <<"client23">>},
{username, <<"plain">>}, {username, <<"plain">>},
@ -164,3 +154,125 @@ t_check_claims_kid_in_header(_) ->
Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}), Result0 = emqx_access_control:authenticate(Plain#{password => Jwt}),
ct:pal("Auth result: ~p~n", [Result0]), ct:pal("Auth result: ~p~n", [Result0]),
?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0). ?assertMatch({ok, #{auth_result := success, jwt_claims := _}}, Result0).
t_check_jwt_acl(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
t_check_jwt_acl(_Config) ->
Jwt = sign([{client_id, <<"client1">>},
{username, <<"plain">>},
{sub, value},
{acl, [{sub, [<<"a/b">>]},
{pub, [<<"c/d">>]}]},
{exp, os:system_time(seconds) + 10}],
<<"HS256">>,
<<"emqxsecret">>),
{ok, C} = emqtt:start_link(
[{clean_start, true},
{proto_ver, v5},
{client_id, <<"client1">>},
{password, Jwt}]),
{ok, _} = emqtt:connect(C),
?assertMatch(
{ok, #{}, [0]},
emqtt:subscribe(C, <<"a/b">>, 0)),
?assertMatch(
ok,
emqtt:publish(C, <<"c/d">>, <<"hi">>, 0)),
?assertMatch(
{ok, #{}, [?RC_NOT_AUTHORIZED]},
emqtt:subscribe(C, <<"c/d">>, 0)),
ok = emqtt:publish(C, <<"a/b">>, <<"hi">>, 0),
receive
{publish, #{topic := <<"a/b">>}} ->
?assert(false, "Publish to `a/b` should not be allowed")
after 100 -> ok
end,
ok = emqtt:disconnect(C).
t_check_jwt_acl_no_recs(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
t_check_jwt_acl_no_recs(_Config) ->
Jwt = sign([{client_id, <<"client1">>},
{username, <<"plain">>},
{sub, value},
{acl, []},
{exp, os:system_time(seconds) + 10}],
<<"HS256">>,
<<"emqxsecret">>),
{ok, C} = emqtt:start_link(
[{clean_start, true},
{proto_ver, v5},
{client_id, <<"client1">>},
{password, Jwt}]),
{ok, _} = emqtt:connect(C),
?assertMatch(
{ok, #{}, [?RC_NOT_AUTHORIZED]},
emqtt:subscribe(C, <<"a/b">>, 0)),
ok = emqtt:disconnect(C).
t_check_jwt_acl_no_acl_claim(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
t_check_jwt_acl_no_acl_claim(_Config) ->
Jwt = sign([{client_id, <<"client1">>},
{username, <<"plain">>},
{sub, value},
{exp, os:system_time(seconds) + 10}],
<<"HS256">>,
<<"emqxsecret">>),
{ok, C} = emqtt:start_link(
[{clean_start, true},
{proto_ver, v5},
{client_id, <<"client1">>},
{password, Jwt}]),
{ok, _} = emqtt:connect(C),
?assertMatch(
{ok, #{}, [?RC_NOT_AUTHORIZED]},
emqtt:subscribe(C, <<"a/b">>, 0)),
ok = emqtt:disconnect(C).
t_check_jwt_acl_expire(init, _Config) ->
application:set_env(emqx_auth_jwt, verify_claims, [{sub, <<"value">>}]).
t_check_jwt_acl_expire(_Config) ->
Jwt = sign([{client_id, <<"client1">>},
{username, <<"plain">>},
{sub, value},
{acl, [{sub, [<<"a/b">>]}]},
{exp, os:system_time(seconds) + 1}],
<<"HS256">>,
<<"emqxsecret">>),
{ok, C} = emqtt:start_link(
[{clean_start, true},
{proto_ver, v5},
{client_id, <<"client1">>},
{password, Jwt}]),
{ok, _} = emqtt:connect(C),
?assertMatch(
{ok, #{}, [0]},
emqtt:subscribe(C, <<"a/b">>, 0)),
?assertMatch(
{ok, #{}, [0]},
emqtt:unsubscribe(C, <<"a/b">>)),
timer:sleep(2000),
?assertMatch(
{ok, #{}, [?RC_NOT_AUTHORIZED]},
emqtt:subscribe(C, <<"a/b">>, 0)),
ok = emqtt:disconnect(C).

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine, {application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"}, [{description, "EMQ X Rule Engine"},
{vsn, "4.4.3"}, % strict semver, bump manually! {vsn, "4.4.4"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]}, {applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -44,8 +44,6 @@ do_nested_get([], Val, _OrgData, _Default) ->
nested_put(Key, Val, Data) when not is_map(Data), nested_put(Key, Val, Data) when not is_map(Data),
not is_list(Data) -> not is_list(Data) ->
nested_put(Key, Val, #{}); nested_put(Key, Val, #{});
nested_put(_, undefined, Map) ->
Map;
nested_put({var, Key}, Val, Map) -> nested_put({var, Key}, Val, Map) ->
general_map_put({key, Key}, Val, Map, Map); general_map_put({key, Key}, Val, Map, Map);
nested_put({path, Path}, Val, Map) when is_list(Path) -> nested_put({path, Path}, Val, Map) when is_list(Path) ->
@ -65,8 +63,6 @@ general_map_get(Key, Map, OrgData, Default) ->
(not_found) -> Default (not_found) -> Default
end). end).
general_map_put(_Key, undefined, Map, _OrgData) ->
Map;
general_map_put(Key, Val, Map, OrgData) -> general_map_put(Key, Val, Map, OrgData) ->
general_find(Key, Map, OrgData, general_find(Key, Map, OrgData,
fun fun

View File

@ -41,6 +41,7 @@
t_nested_put_map(_) -> t_nested_put_map(_) ->
?assertEqual(#{a => 1}, nested_put(?path([a]), 1, #{})), ?assertEqual(#{a => 1}, nested_put(?path([a]), 1, #{})),
?assertEqual(#{a => undefined}, nested_put(?path([a]), undefined, #{})),
?assertEqual(#{a => a}, nested_put(?path([a]), a, #{})), ?assertEqual(#{a => a}, nested_put(?path([a]), a, #{})),
?assertEqual(#{a => 1}, nested_put(?path([a]), 1, not_map)), ?assertEqual(#{a => 1}, nested_put(?path([a]), 1, not_map)),
?assertEqual(#{a => #{b => b}}, nested_put(?path([a,b]), b, #{})), ?assertEqual(#{a => #{b => b}}, nested_put(?path([a,b]), b, #{})),
@ -172,4 +173,3 @@ all() ->
suite() -> suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].

231
bin/emqx
View File

@ -9,9 +9,9 @@ if [ "$DEBUG" -eq 1 ]; then
set -x set -x
fi fi
ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)" RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
# shellcheck disable=SC1090 # shellcheck disable=SC1090
. "$ROOT_DIR"/releases/emqx_vars . "$RUNNER_ROOT_DIR"/releases/emqx_vars
RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME" RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}" CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"
@ -36,6 +36,12 @@ ERTS_LIB_DIR="$ERTS_DIR/../lib"
# Echo to stderr on errors # Echo to stderr on errors
echoerr() { echo "$*" 1>&2; } echoerr() { echo "$*" 1>&2; }
assert_node_alive() {
if ! relx_nodetool "ping" > /dev/null; then
die "node_is_not_running!" 1
fi
}
check_eralng_start() { check_eralng_start() {
"$BINDIR/$PROGNAME" -noshell -boot "$REL_DIR/start_clean" -s crypto start -s init stop "$BINDIR/$PROGNAME" -noshell -boot "$REL_DIR/start_clean" -s crypto start -s init stop
} }
@ -65,16 +71,80 @@ fi
# cuttlefish try to read environment variables starting with "EMQX_" # cuttlefish try to read environment variables starting with "EMQX_"
export CUTTLEFISH_ENV_OVERRIDE_PREFIX='EMQX_' export CUTTLEFISH_ENV_OVERRIDE_PREFIX='EMQX_'
relx_usage() { usage() {
command="$1" local command="$1"
case "$command" in case "$command" in
start)
echo "Start EMQX service in daemon mode"
;;
stop)
echo "Stop the running EMQX program"
;;
console)
echo "Boot up EMQX service in an interactive Erlang shell"
echo "This command needs a tty"
;;
console_clean)
echo "This command does NOT boot up the EMQX service"
echo "It only starts an interactive Erlang shell with all the"
echo "EMQX code available"
;;
foreground)
echo "Start EMQX in foreground mode without an interactive shell"
;;
pid)
echo "Print out EMQX process identifier"
;;
ping)
echo "Check if the EMQX node is up and running"
echo "This command exit with 0 silently if node is running"
;;
escript)
echo "Execute a escript using the Erlang runtime from EMQX package installation"
echo "For example $REL_NAME escript /path/to/my/escript my_arg1 my_arg2"
;;
attach)
echo "This command is applicable when EMQX is started in daemon mode."
echo "It attaches the current shell to EMQX's control console"
echo "through a named pipe."
echo "WARNING: try to use the safer alternative, remote_console command."
;;
remote_console)
echo "Start an interactive shell running an Erlang node which "
echo "hidden-connects to the running EMQX node".
echo "This command is mostly used for troubleshooting."
;;
ertspath)
echo "Print path to Erlang runtime dir"
;;
rpc)
echo "Usge $REL_NAME rpc MODULE FUNCTION [ARGS, ...]"
echo "Connect to the EMQX node and make an Erlang RPC"
echo "This command blocks for at most 60 seconds."
echo "It exits with non-zero code in case of any RPC failure"
echo "including connection error and runtime exception"
;;
rpcterms)
echo "Usge $REL_NAME rpcterms MODULE FUNCTION [ARGS, ...]"
echo "Connect to the EMQX node and make an Erlang RPC"
echo "The result of the RPC call is pretty-printed as an "
echo "Erlang term"
;;
root_dir)
echo "Print EMQX installation root dir"
;;
eval)
echo "Evaluate an Erlang expression in the EMQX node"
;;
versions)
echo "List installed EMQX versions and their status"
;;
unpack) unpack)
echo "Usage: $REL_NAME unpack [VERSION]" echo "Usage: $REL_NAME unpack [VERSION]"
echo "Unpacks a release package VERSION, it assumes that this" echo "Unpacks a release package VERSION, it assumes that this"
echo "release package tarball has already been deployed at one" echo "release package tarball has already been deployed at one"
echo "of the following locations:" echo "of the following locations:"
echo " releases/<relname>-<version>.tar.gz"
echo " releases/<relname>-<version>.zip" echo " releases/<relname>-<version>.zip"
;; ;;
install) install)
@ -82,7 +152,6 @@ relx_usage() {
echo "Installs a release package VERSION, it assumes that this" echo "Installs a release package VERSION, it assumes that this"
echo "release package tarball has already been deployed at one" echo "release package tarball has already been deployed at one"
echo "of the following locations:" echo "of the following locations:"
echo " releases/<relname>-<version>.tar.gz"
echo " releases/<relname>-<version>.zip" echo " releases/<relname>-<version>.zip"
echo "" echo ""
echo " --no-permanent Install release package VERSION but" echo " --no-permanent Install release package VERSION but"
@ -98,7 +167,6 @@ relx_usage() {
echo "Upgrades the currently running release to VERSION, it assumes" echo "Upgrades the currently running release to VERSION, it assumes"
echo "that a release package tarball has already been deployed at one" echo "that a release package tarball has already been deployed at one"
echo "of the following locations:" echo "of the following locations:"
echo " releases/<relname>-<version>.tar.gz"
echo " releases/<relname>-<version>.zip" echo " releases/<relname>-<version>.zip"
echo "" echo ""
echo " --no-permanent Install release package VERSION but" echo " --no-permanent Install release package VERSION but"
@ -109,18 +177,51 @@ relx_usage() {
echo "Downgrades the currently running release to VERSION, it assumes" echo "Downgrades the currently running release to VERSION, it assumes"
echo "that a release package tarball has already been deployed at one" echo "that a release package tarball has already been deployed at one"
echo "of the following locations:" echo "of the following locations:"
echo " releases/<relname>-<version>.tar.gz"
echo " releases/<relname>-<version>.zip" echo " releases/<relname>-<version>.zip"
echo "" echo ""
echo " --no-permanent Install release package VERSION but" echo " --no-permanent Install release package VERSION but"
echo " don't make it permanent" echo " don't make it permanent"
;; ;;
*) *)
echo "Usage: $REL_NAME {start|start_boot <file>|ertspath|foreground|stop|restart|reboot|pid|ping|console|console_clean|console_boot <file>|attach|remote_console|upgrade|downgrade|install|uninstall|versions|escript|rpc|rpcterms|eval|root_dir}" echo "Usage: $REL_NAME COMMAND [help]"
echo ''
echo "Commonly used COMMANDs:"
echo " start: Start EMQX in daemon mode"
echo " console: Start EMQX in an interactive Erlang shell"
echo " foreground: Start EMQX in foreground mode without an interactive shell"
echo " stop: Stop the running EMQX node"
echo " ctl: Administration commands, execute '$REL_NAME ctl help' for more details"
echo ''
echo "More:"
echo " Shell attach: remote_console | attach"
echo " Up/Down-grade: upgrade | downgrade | install | uninstall"
echo " Install info: ertspath | root_dir | versions"
echo " Runtime info: pid | ping | versions"
echo " Advanced: console_clean | escript | rpc | rpcterms | eval"
echo ''
echo "Execute '$REL_NAME COMMAND help' for more information"
;; ;;
esac esac
} }
COMMAND="${1:-}"
if [ -z "$COMMAND" ]; then
usage 'help'
exit 1
elif [ "$COMMAND" = 'help' ]; then
usage 'help'
exit 0
fi
if [ "${2:-}" = 'help' ]; then
## 'ctl' command has its own usage info
if [ "$COMMAND" != 'ctl' ]; then
usage "$COMMAND"
exit 0
fi
fi
# Simple way to check the correct user and fail early # Simple way to check the correct user and fail early
check_user() { check_user() {
# Validate that the user running the script is the owner of the # Validate that the user running the script is the owner of the
@ -140,7 +241,6 @@ check_user() {
fi fi
} }
# Make sure the user running this script is the owner and/or su to that user # Make sure the user running this script is the owner and/or su to that user
check_user "$@" check_user "$@"
ES=$? ES=$?
@ -151,7 +251,8 @@ fi
if [ -z "$WITH_EPMD" ]; then if [ -z "$WITH_EPMD" ]; then
EPMD_ARG="-start_epmd false -epmd_module ekka_epmd -proto_dist ekka" EPMD_ARG="-start_epmd false -epmd_module ekka_epmd -proto_dist ekka"
else else
EPMD_ARG="-start_epmd true $PROTO_DIST_ARG" PROTO_DIST=$(grep -E '^[ \t]*cluster.proto_dist[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | awk -F"= " '{print $NF}')
EPMD_ARG="-start_epmd true -proto_dist $PROTO_DIST"
fi fi
# Warn the user if ulimit -n is less than 1024 # Warn the user if ulimit -n is less than 1024
@ -162,9 +263,6 @@ if [ "$ULIMIT_F" -lt 1024 ]; then
echo "!!!!" echo "!!!!"
fi fi
# By default, use cuttlefish to generate app.config and vm.args
CUTTLEFISH="${USE_CUTTLEFISH:-yes}"
SED_REPLACE="sed -i " SED_REPLACE="sed -i "
case $(sed --help 2>&1) in case $(sed --help 2>&1) in
*GNU*) SED_REPLACE="sed -i ";; *GNU*) SED_REPLACE="sed -i ";;
@ -235,20 +333,16 @@ relx_start_command() {
"$START_OPTION" "$START_OPTION"
} }
trim() {
echo -e "${1}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//'
}
# Function to generate app.config and vm.args # Function to generate app.config and vm.args
generate_config() { generate_config() {
## Delete the *.siz files first or it cann't start after ## Delete the *.siz files first or it cann't start after
## changing the config 'log.rotation.size' ## changing the config 'log.rotation.size'
rm -rf "${RUNNER_LOG_DIR}"/*.siz rm -rf "${RUNNER_LOG_DIR}"/*.siz
if [ "$CUTTLEFISH" != "yes" ]; then
# Note: we have added a parameter '-vm_args' to this. It
# appears redundant but it is not! the erlang vm allows us to
# access all arguments to the erl command EXCEPT '-args_file',
# so in order to get access to this file location from within
# the vm, we need to pass it in twice.
CONFIG_ARGS=" -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -vm_args $RUNNER_ETC_DIR/vm.args "
else
EMQX_LICENSE_CONF_OPTION="" EMQX_LICENSE_CONF_OPTION=""
if [ "${EMQX_LICENSE_CONF:-}" != "" ]; then if [ "${EMQX_LICENSE_CONF:-}" != "" ]; then
EMQX_LICENSE_CONF_OPTION="-i ${EMQX_LICENSE_CONF}" EMQX_LICENSE_CONF_OPTION="-i ${EMQX_LICENSE_CONF}"
@ -264,16 +358,18 @@ generate_config() {
echo "$CUTTLEFISH_OUTPUT" echo "$CUTTLEFISH_OUTPUT"
exit $RESULT exit $RESULT
fi fi
# print override from environment variables (EMQX_*) ## transform a single line args list like '-config ... -args_file ... -vm_args ...' to lines and get path for each file respectively
echo "$CUTTLEFISH_OUTPUT" | sed -e '$d' ## NOTE: the -args_file and -vm_args are the same file passed twice because args_file is used by beam, but not possible to get at runtime
CONFIG_ARGS=$(echo "$CUTTLEFISH_OUTPUT" | tail -n 1) ## by calling init:get_arguments/0
lines="$(echo "$CUTTLEFISH_OUTPUT" | tail -1 | sed 's/-config/\nconfig=/g' | sed 's/-args_file/\nargs_file=/g' | sed 's/-vm_args/\nvm_args=/g')"
CONFIG_FILE="$(trim "$(echo -e "$lines" | grep 'config=' | sed 's/config=//g')")"
CUTTLE_GEN_ARG_FILE="$(trim "$(echo -e "$lines" | grep 'vm_args=' | sed 's/vm_args=//g')")"
## Merge cuttlefish generated *.args into the vm.args ## Merge cuttlefish generated *.args into the vm.args
CUTTLE_GEN_ARG_FILE=$(echo "$CONFIG_ARGS" | sed -n 's/^.*\(vm_args[[:space:]]\)//p' | awk '{print $1}')
TMP_ARG_FILE="$RUNNER_DATA_DIR/configs/vm.args.tmp" TMP_ARG_FILE="$RUNNER_DATA_DIR/configs/vm.args.tmp"
cp "$RUNNER_ETC_DIR/vm.args" "$TMP_ARG_FILE" cp "$RUNNER_ETC_DIR/vm.args" "$TMP_ARG_FILE"
echo "" >> "$TMP_ARG_FILE" echo "" >> "$TMP_ARG_FILE"
echo "-pa ${REL_DIR}/consolidated" >> "$TMP_ARG_FILE" echo "-pa \"${REL_DIR}/consolidated\"" >> "$TMP_ARG_FILE"
sed '/^#/d' "$CUTTLE_GEN_ARG_FILE" | sed '/^$/d' | while IFS='' read -r ARG_LINE || [ -n "$ARG_LINE" ]; do sed '/^#/d' "$CUTTLE_GEN_ARG_FILE" | sed '/^$/d' | while IFS='' read -r ARG_LINE || [ -n "$ARG_LINE" ]; do
ARG_KEY=$(echo "$ARG_LINE" | awk '{$NF="";print}') ARG_KEY=$(echo "$ARG_LINE" | awk '{$NF="";print}')
ARG_VALUE=$(echo "$ARG_LINE" | awk '{print $NF}') ARG_VALUE=$(echo "$ARG_LINE" | awk '{print $NF}')
@ -289,7 +385,7 @@ generate_config() {
TMP_ARG_VALUE=$(grep "^$ARG_KEY" "$TMP_ARG_FILE" | awk '{print $NF}') TMP_ARG_VALUE=$(grep "^$ARG_KEY" "$TMP_ARG_FILE" | awk '{print $NF}')
if [ "$ARG_VALUE" != "$TMP_ARG_VALUE" ] ; then if [ "$ARG_VALUE" != "$TMP_ARG_VALUE" ] ; then
if [ -n "$TMP_ARG_VALUE" ]; then if [ -n "$TMP_ARG_VALUE" ]; then
sh -c "$SED_REPLACE 's/^$ARG_KEY.*$/$ARG_LINE/' $TMP_ARG_FILE" sh -c "$SED_REPLACE 's/^$ARG_KEY.*$/$ARG_LINE/' \"$TMP_ARG_FILE\""
else else
echo "$ARG_LINE" >> "$TMP_ARG_FILE" echo "$ARG_LINE" >> "$TMP_ARG_FILE"
fi fi
@ -297,11 +393,9 @@ generate_config() {
fi fi
done done
mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE" mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE"
fi
# shellcheck disable=SC2086 if ! relx_nodetool chkconfig -config "$CONFIG_FILE"; then
if ! relx_nodetool chkconfig $CONFIG_ARGS; then echoerr "Error reading $CONFIG_FILE"
echoerr "Error reading $CONFIG_ARGS"
exit 1 exit 1
fi fi
} }
@ -383,15 +477,6 @@ wait_for() {
done done
} }
# Use $CWD/etc/sys.config if exists
if [ -z "$RELX_CONFIG_PATH" ]; then
if [ -f "$RUNNER_ETC_DIR/sys.config" ]; then
RELX_CONFIG_PATH="-config $RUNNER_ETC_DIR/sys.config"
else
RELX_CONFIG_PATH=""
fi
fi
IS_BOOT_COMMAND='no' IS_BOOT_COMMAND='no'
case "$1" in case "$1" in
start|start_boot) start|start_boot)
@ -414,9 +499,10 @@ if [ -z "$NAME_ARG" ]; then
if [ "$IS_BOOT_COMMAND" = 'no' ]; then if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name # for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086 # shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args | head -1)" LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then if [ -z "$LATEST_VM_ARGS" ]; then
echo "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/" echoerr "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
echoerr "Please make sure the node is initialized (started for at least once)"
exit 1 exit 1
fi fi
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')" NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')"
@ -465,14 +551,6 @@ if [ -z "$COOKIE" ]; then
exit 1 exit 1
fi fi
# Support for IPv6 Dist. See: https://github.com/emqtt/emqttd/issues/1460
PROTO_DIST=$(grep -E '^[ \t]*cluster.proto_dist[ \t]*=[ \t]*' "$RUNNER_ETC_DIR/emqx.conf" 2> /dev/null | tail -1 | awk -F"= " '{print $NF}')
if [ -z "$PROTO_DIST" ]; then
PROTO_DIST_ARG=""
else
PROTO_DIST_ARG="-proto_dist $PROTO_DIST"
fi
cd "$ROOTDIR" cd "$ROOTDIR"
# User can specify an sname without @hostname # User can specify an sname without @hostname
@ -710,14 +788,17 @@ case "$1" in
# Store passed arguments since they will be erased by `set` # Store passed arguments since they will be erased by `set`
ARGS="$*" ARGS="$*"
# shellcheck disable=SC2086 # $RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG are supposed to be split by whitespace # shellcheck disable=SC2086 # $EPMD_ARG is supposed to be split by whitespace
# Build an array of arguments to pass to exec later on # Build an array of arguments to pass to exec later on
# Build it here because this command will be used for logging. # Build it here because this command will be used for logging.
set -- "$BINDIR/erlexec" \ set -- "$BINDIR/erlexec" \
-boot "$BOOTFILE" -mode "$CODE_LOADING_MODE" \ -boot "$BOOTFILE" -mode "$CODE_LOADING_MODE" \
-boot_var ERTS_LIB_DIR "$ERTS_LIB_DIR" \ -boot_var ERTS_LIB_DIR "$ERTS_LIB_DIR" \
-mnesia dir "\"${MNESIA_DATA_DIR}\"" \ -mnesia dir "\"${MNESIA_DATA_DIR}\"" \
$RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG -config "$CONFIG_FILE" \
-args_file "$CUTTLE_GEN_ARG_FILE" \
-vm_args "$CUTTLE_GEN_ARG_FILE" \
$EPMD_ARG
# Log the startup # Log the startup
logger -t "${REL_NAME}[$$]" "$* -- ${1+$ARGS}" logger -t "${REL_NAME}[$$]" "$* -- ${1+$ARGS}"
@ -751,14 +832,17 @@ case "$1" in
# Store passed arguments since they will be erased by `set` # Store passed arguments since they will be erased by `set`
ARGS="$*" ARGS="$*"
# shellcheck disable=SC2086 # $RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG are supposed to be split by whitespace # shellcheck disable=SC2086 # $EPMD_ARG is supposed to be split by whitespace
# Build an array of arguments to pass to exec later on # Build an array of arguments to pass to exec later on
# Build it here because this command will be used for logging. # Build it here because this command will be used for logging.
set -- "$BINDIR/erlexec" $FOREGROUNDOPTIONS \ set -- "$BINDIR/erlexec" $FOREGROUNDOPTIONS \
-boot "$REL_DIR/$BOOTFILE" -mode "$CODE_LOADING_MODE" \ -boot "$REL_DIR/$BOOTFILE" -mode "$CODE_LOADING_MODE" \
-boot_var ERTS_LIB_DIR "$ERTS_LIB_DIR" \ -boot_var ERTS_LIB_DIR "$ERTS_LIB_DIR" \
-mnesia dir "\"${MNESIA_DATA_DIR}\"" \ -mnesia dir "\"${MNESIA_DATA_DIR}\"" \
$RELX_CONFIG_PATH $CONFIG_ARGS $EPMD_ARG -config "$CONFIG_FILE" \
-args_file "$CUTTLE_GEN_ARG_FILE" \
-vm_args "$CUTTLE_GEN_ARG_FILE" \
$EPMD_ARG
# Log the startup # Log the startup
logger -t "${REL_NAME}[$$]" "$* -- ${1+$ARGS}" logger -t "${REL_NAME}[$$]" "$* -- ${1+$ARGS}"
@ -769,50 +853,35 @@ case "$1" in
ertspath) ertspath)
echo "$ERTS_PATH" echo "$ERTS_PATH"
;; ;;
rpc)
# Make sure a node IS running
if ! relx_nodetool "ping" > /dev/null; then
echo "Node is not running!"
exit 1
fi
ctl)
assert_node_alive
shift shift
relx_nodetool rpc_infinity emqx_ctl run_command "$@"
;;
rpc)
assert_node_alive
shift
relx_nodetool rpc "$@" relx_nodetool rpc "$@"
;; ;;
rpcterms) rpcterms)
# Make sure a node IS running assert_node_alive
if ! relx_nodetool "ping" > /dev/null; then
echo "Node is not running!"
exit 1
fi
shift shift
relx_nodetool rpcterms "$@" relx_nodetool rpcterms "$@"
;; ;;
root_dir) root_dir)
# Make sure a node IS running assert_node_alive
if ! relx_nodetool "ping" > /dev/null; then
echo "Node is not running!"
exit 1
fi
shift shift
relx_nodetool "eval" 'code:root_dir()' relx_nodetool "eval" 'code:root_dir()'
;; ;;
eval) eval)
# Make sure a node IS running assert_node_alive
if ! relx_nodetool "ping" > /dev/null; then
echo "Node is not running!"
exit 1
fi
shift shift
relx_nodetool "eval" "$@" relx_nodetool "eval" "$@"
;; ;;
*) *)
relx_usage "$1" usage "$COMMAND"
exit 1 exit 1
;; ;;
esac esac

View File

@ -8,8 +8,10 @@
:: * restart - run the stop command and start command :: * restart - run the stop command and start command
:: * uninstall - uninstall the service and kill a running node :: * uninstall - uninstall the service and kill a running node
:: * ping - check if the node is running :: * ping - check if the node is running
:: * ctl - run management commands
:: * console - start the Erlang release in a `werl` Windows shell :: * console - start the Erlang release in a `werl` Windows shell
:: * attach - connect to a running node and open an interactive console :: * attach - connect to a running node and open an interactive console
:: * remote_console - same as attach
:: * list - display a listing of installed Erlang services :: * list - display a listing of installed Erlang services
:: * usage - display available commands :: * usage - display available commands
@ -24,6 +26,9 @@
@set script=%~n0 @set script=%~n0
@set EPMD_ARG=-start_epmd false -epmd_module ekka_epmd -proto_dist ekka
@set ERL_FLAGS=%EPMD_ARG%
:: Discover the release root directory from the directory :: Discover the release root directory from the directory
:: of this script :: of this script
@set script_dir=%~dp0 @set script_dir=%~dp0
@ -46,7 +51,7 @@
@set service_name=%rel_name%_%rel_vsn% @set service_name=%rel_name%_%rel_vsn%
@set bindir=%erts_dir%\bin @set bindir=%erts_dir%\bin
@set progname=erl.exe @set progname=erl.exe
@set clean_boot_script=%rel_root_dir%\bin\start_clean @set clean_boot_script=%rel_dir%\start_clean
@set erlsrv="%bindir%\erlsrv.exe" @set erlsrv="%bindir%\erlsrv.exe"
@set epmd="%bindir%\epmd.exe" @set epmd="%bindir%\epmd.exe"
@set escript="%bindir%\escript.exe" @set escript="%bindir%\escript.exe"
@ -83,8 +88,10 @@
::@if "%1"=="downgrade" @goto relup ::@if "%1"=="downgrade" @goto relup
@if "%1"=="console" @goto console @if "%1"=="console" @goto console
@if "%1"=="ping" @goto ping @if "%1"=="ping" @goto ping
@if "%1"=="ctl" @goto ctl
@if "%1"=="list" @goto list @if "%1"=="list" @goto list
@if "%1"=="attach" @goto attach @if "%1"=="attach" @goto attach
@if "%1"=="remote_console" @goto attach
@if "%1"=="" @goto usage @if "%1"=="" @goto usage
@echo Unknown command: "%1" @echo Unknown command: "%1"
@ -239,7 +246,7 @@ cd /d %rel_root_dir%
@echo off @echo off
cd /d %rel_root_dir% cd /d %rel_root_dir%
@echo on @echo on
@start "bin\%rel_name% console" %werl% -boot "%boot_script%" %args% %erl_exe% -boot "%boot_script%" %args%
@echo emqx is started! @echo emqx is started!
@goto :eof @goto :eof
@ -248,6 +255,12 @@ cd /d %rel_root_dir%
@%escript% %nodetool% ping %node_type% "%node_name%" -setcookie "%node_cookie%" @%escript% %nodetool% ping %node_type% "%node_name%" -setcookie "%node_cookie%"
@goto :eof @goto :eof
:: ctl to execute management commands
:ctl
@for /f "usebackq tokens=1*" %%i in (`echo %*`) DO @ set params=%%j
@%escript% %nodetool% %node_type% "%node_name%" -setcookie "%node_cookie%" rpc_infinity emqx_ctl run_command %params%
@goto :eof
:: List installed Erlang services :: List installed Erlang services
:list :list
@%erlsrv% list %service_name% @%erlsrv% list %service_name%
@ -256,8 +269,7 @@ cd /d %rel_root_dir%
:: Attach to a running node :: Attach to a running node
:attach :attach
:: @start "%node_name% attach" :: @start "%node_name% attach"
@start "%node_name% attach" %werl% -boot "%clean_boot_script%" ^ %erl_exe% -boot "%clean_boot_script%" -remsh %node_name% %node_type% remsh_%node_name% -setcookie %node_cookie%
-remsh %node_name% %node_type% console_%node_name% -setcookie %node_cookie%
@goto :eof @goto :eof
:: Trim variable :: Trim variable

View File

@ -2,83 +2,5 @@
# -*- tab-width:4;indent-tabs-mode:nil -*- # -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et # ex: ts=4 sw=4 et
set -e THIS_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")" || true; pwd -P)"
exec "$THIS_DIR/emqx" ctl "$@"
ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
# shellcheck disable=SC1090
. "$ROOT_DIR"/releases/emqx_vars
export RUNNER_ROOT_DIR
export REL_VSN
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echo "No vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
exit 1
fi
# Echo to stderr on errors
echoerr() { echo "$@" 1>&2; }
if [ -z "$WITH_EPMD" ]; then
EPMD_ARG="-start_epmd false -epmd_module ekka_epmd -proto_dist ekka"
else
EPMD_ARG="-start_epmd true"
fi
relx_get_nodename() {
id="longname$(relx_gen_id)-${NAME}"
"$BINDIR/erl" -boot start_clean -eval '[Host] = tl(string:tokens(atom_to_list(node()),"@")), io:format("~s~n", [Host]), halt()' -noshell "${NAME_TYPE}" "$id"
}
# Control a node
relx_nodetool() {
command="$1"; shift
ERL_FLAGS="$ERL_FLAGS $EPMD_ARG $PROTO_DIST_ARG" \
ERL_LIBS="${LIB_EKKA_DIR}:${ERL_LIBS:-}" \
"$ERTS_DIR/bin/escript" "$ROOTDIR/bin/nodetool" "$NAME_TYPE" "$NAME" \
-setcookie "$COOKIE" "$command" "$@"
}
if [ -z "$NAME_ARG" ]; then
NODENAME="${EMQX_NODE_NAME:-}"
[ -z "$NODENAME" ] && [ -n "$EMQX_NAME" ] && [ -n "$EMQX_HOST" ] && NODENAME="${EMQX_NAME}@${EMQX_HOST}"
[ -z "$NODENAME" ] && NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')"
if [ -z "$NODENAME" ]; then
echoerr "vm.args needs to have a -name parameter."
echoerr " -sname is not supported."
echoerr "perhaps you do not have read permissions on $RUNNER_ETC_DIR/emqx.conf"
exit 1
else
NAME_ARG="-name ${NODENAME# *}"
fi
fi
# Extract the name type and name from the NAME_ARG for REMSH
NAME_TYPE="$(echo "$NAME_ARG" | awk '{print $1}')"
NAME="$(echo "$NAME_ARG" | awk '{print $2}')"
COOKIE="${EMQX_NODE_COOKIE:-}"
[ -z "$COOKIE" ] && COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS" | awk '{print $2}')"
if [ -z "$COOKIE" ]; then
echoerr "Please set node.cookie in $RUNNER_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE_COOKIE"
exit 1
fi
# Support for IPv6 Dist. See: https://github.com/emqtt/emqttd/issues/1460
PROTO_DIST=$(grep -E '^[ \t]*cluster.proto_dist[ \t]*=[ \t]*' "$RUNNER_ETC_DIR"/emqx.conf 2> /dev/null | tail -1 | awk -F"= " '{print $NF}')
if [ -z "$PROTO_DIST" ]; then
PROTO_DIST_ARG=""
else
PROTO_DIST_ARG="-proto_dist $PROTO_DIST"
fi
export ROOTDIR="$RUNNER_ROOT_DIR"
export ERTS_DIR="$ROOTDIR/erts-$ERTS_VSN"
export BINDIR="$ERTS_DIR/bin"
cd "$ROOTDIR"
relx_nodetool rpc emqx_ctl run_command "$@"

View File

@ -2,91 +2,10 @@
@set args=%* @set args=%*
:: Set variables that describe the release
@set rel_name=emqx
@set rel_vsn={{ release_version }}
@set erts_vsn={{ erts_vsn }}
@set erl_opts={{ erl_opts }}
:: Discover the release root directory from the directory :: Discover the release root directory from the directory
:: of this script :: of this script
@set script_dir=%~dp0 @set script_dir=%~dp0
@for %%A in ("%script_dir%\..") do @( @for %%A in ("%script_dir%\..") do @(
set rel_root_dir=%%~fA set rel_root_dir=%%~fA
) )
@set rel_dir=%rel_root_dir%\releases\%rel_vsn% @%rel_root_dir%\bin\emqx.cmd ctl %args%
@set emqx_conf=%rel_root_dir%\etc\emqx.conf
@call :find_erts_dir
@set bindir=%erts_dir%\bin
@set progname=erl.exe
@set escript="%bindir%\escript.exe"
@set nodetool="%rel_root_dir%\bin\nodetool"
@set node_type="-name"
:: Extract node name from emqx.conf
@for /f "usebackq delims=\= tokens=2" %%I in (`findstr /b node\.name "%emqx_conf%"`) do @(
@call :set_trim node_name %%I
)
:: Extract node cookie from emqx.conf
@for /f "usebackq delims=\= tokens=2" %%I in (`findstr /b node\.cookie "%emqx_conf%"`) do @(
@call :set_trim node_cookie= %%I
)
:: Write the erl.ini file to set up paths relative to this script
@call :write_ini
:: If a start.boot file is not present, copy one from the named .boot file
@if not exist "%rel_dir%\start.boot" (
copy "%rel_dir%\%rel_name%.boot" "%rel_dir%\start.boot" >nul
)
@%escript% %nodetool% %node_type% "%node_name%" -setcookie "%node_cookie%" rpc emqx_ctl run_command %args%
:: Find the ERTS dir
:find_erts_dir
@set possible_erts_dir=%rel_root_dir%\erts-%erts_vsn%
@if exist "%possible_erts_dir%" (
call :set_erts_dir_from_default
) else (
call :set_erts_dir_from_erl
)
@goto :eof
:: Set the ERTS dir from the passed in erts_vsn
:set_erts_dir_from_default
@set erts_dir=%possible_erts_dir%
@set rootdir=%rel_root_dir%
@goto :eof
:: Set the ERTS dir from erl
:set_erts_dir_from_erl
@for /f "delims=" %%i in ('where erl') do @(
set erl=%%i
)
@set dir_cmd="%erl%" -noshell -eval "io:format(\"~s\", [filename:nativename(code:root_dir())])." -s init stop
@for /f %%i in ('%%dir_cmd%%') do @(
set erl_root=%%i
)
@set erts_dir=%erl_root%\erts-%erts_vsn%
@set rootdir=%erl_root%
@goto :eof
:: Write the erl.ini file
:write_ini
@set erl_ini=%erts_dir%\bin\erl.ini
@set converted_bindir=%bindir:\=\\%
@set converted_rootdir=%rootdir:\=\\%
@echo [erlang] > "%erl_ini%"
@echo Bindir=%converted_bindir% >> "%erl_ini%"
@echo Progname=%progname% >> "%erl_ini%"
@echo Rootdir=%converted_rootdir% >> "%erl_ini%"
@goto :eof
:: Trim variable
:set_trim
@set %1=%2
@goto :eof

View File

@ -266,7 +266,8 @@ unpack_zipballs(RelNameStr, Version) ->
GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])), GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])),
ZipFiles = filelib:wildcard(filename:join(["releases", RelNameStr ++ "-*" ++ Version ++ "*.zip"])), ZipFiles = filelib:wildcard(filename:join(["releases", RelNameStr ++ "-*" ++ Version ++ "*.zip"])),
?INFO("unzip ~p", [ZipFiles]), ?INFO("unzip ~p", [ZipFiles]),
[begin lists:foreach(
fun(Zip) ->
TmdTarD = "/tmp/emqx_untar_" ++ integer_to_list(erlang:system_time()), TmdTarD = "/tmp/emqx_untar_" ++ integer_to_list(erlang:system_time()),
ok = filelib:ensure_dir(filename:join([TmdTarD, "dummy"])), ok = filelib:ensure_dir(filename:join([TmdTarD, "dummy"])),
{ok, _} = file:copy(Zip, filename:join([TmdTarD, "emqx.zip"])), {ok, _} = file:copy(Zip, filename:join([TmdTarD, "emqx.zip"])),
@ -274,10 +275,11 @@ unpack_zipballs(RelNameStr, Version) ->
{ok, _FileList} = zip:unzip("emqx.zip"), {ok, _FileList} = zip:unzip("emqx.zip"),
ok = file:set_cwd(filename:join([TmdTarD, "emqx"])), ok = file:set_cwd(filename:join([TmdTarD, "emqx"])),
ok = erl_tar:create(GzFile, filelib:wildcard("*"), [compressed]) ok = erl_tar:create(GzFile, filelib:wildcard("*"), [compressed])
end || Zip <- ZipFiles] end,
ZipFiles)
after after
% restore cwd % restore cwd
file:set_cwd(Cwd) ok = file:set_cwd(Cwd)
end. end.
first_value(_Fun, []) -> no_value; first_value(_Fun, []) -> no_value;

View File

@ -6,7 +6,6 @@
REL_VSN="{{ release_version }}" REL_VSN="{{ release_version }}"
ERTS_VSN="{{ erts_vsn }}" ERTS_VSN="{{ erts_vsn }}"
ERL_OPTS="{{ erl_opts }}" ERL_OPTS="{{ erl_opts }}"
RUNNER_ROOT_DIR="{{ runner_root_dir }}"
RUNNER_BIN_DIR="{{ runner_bin_dir }}" RUNNER_BIN_DIR="{{ runner_bin_dir }}"
RUNNER_LOG_DIR="{{ runner_log_dir }}" RUNNER_LOG_DIR="{{ runner_log_dir }}"
RUNNER_LIB_DIR="{{ runner_lib_dir }}" RUNNER_LIB_DIR="{{ runner_lib_dir }}"

View File

@ -19,10 +19,10 @@ BuildRoot: %{_tmppath}/%{_name}-%{_version}-root
Provides: %{_name} Provides: %{_name}
AutoReq: 0 AutoReq: 0
%if "%{_arch} %{?rhel}" == "amd64 7" %if "%{_arch} %{?rhel}" == "x86_64 7"
Requires: openssl11 libatomic Requires: openssl11 libatomic procps which findutils
%else %else
Requires: libatomic Requires: libatomic procps which findutils
%endif %endif
%description %description

View File

@ -216,7 +216,6 @@ overlay_vars_pkg(bin) ->
, {platform_lib_dir, "lib"} , {platform_lib_dir, "lib"}
, {platform_log_dir, "log"} , {platform_log_dir, "log"}
, {platform_plugins_dir, "etc/plugins"} , {platform_plugins_dir, "etc/plugins"}
, {runner_root_dir, "$(cd $(dirname $(readlink $0 || echo $0))/..; pwd -P)"}
, {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"} , {runner_bin_dir, "$RUNNER_ROOT_DIR/bin"}
, {runner_etc_dir, "$RUNNER_ROOT_DIR/etc"} , {runner_etc_dir, "$RUNNER_ROOT_DIR/etc"}
, {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"} , {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"}
@ -231,7 +230,6 @@ overlay_vars_pkg(pkg) ->
, {platform_lib_dir, ""} , {platform_lib_dir, ""}
, {platform_log_dir, "/var/log/emqx"} , {platform_log_dir, "/var/log/emqx"}
, {platform_plugins_dir, "/var/lib/emqx/plugins"} , {platform_plugins_dir, "/var/lib/emqx/plugins"}
, {runner_root_dir, "/usr/lib/emqx"}
, {runner_bin_dir, "/usr/bin"} , {runner_bin_dir, "/usr/bin"}
, {runner_etc_dir, "/etc/emqx"} , {runner_etc_dir, "/etc/emqx"}
, {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"} , {runner_lib_dir, "$RUNNER_ROOT_DIR/lib"}

View File

@ -19,6 +19,7 @@
%% APIs %% APIs
-export([ match/3 -export([ match/3
, compile/1 , compile/1
, feed_var/2
]). ]).
-export_type([rule/0]). -export_type([rule/0]).
@ -158,4 +159,3 @@ feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [Username|Acc]); feed_var(ClientInfo, Words, [Username|Acc]);
feed_var(ClientInfo, [W|Words], Acc) -> feed_var(ClientInfo, [W|Words], Acc) ->
feed_var(ClientInfo, Words, [W|Acc]). feed_var(ClientInfo, Words, [W|Acc]).

View File

@ -273,7 +273,6 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
(PacketId =/= undefined) andalso (PacketId =/= undefined) andalso
StrictMode andalso validate_packet_id(PacketId), StrictMode andalso validate_packet_id(PacketId),
{Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode),
ok = ensure_topic_name_valid(StrictMode, TopicName, Properties),
Publish = #mqtt_packet_publish{topic_name = TopicName, Publish = #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId, packet_id = PacketId,
properties = Properties properties = Properties
@ -360,7 +359,6 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
{Props, Rest} = parse_properties(Bin, Ver, StrictMode), {Props, Rest} = parse_properties(Bin, Ver, StrictMode),
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Topic, Rest1} = parse_utf8_string(Rest, StrictMode),
{Payload, Rest2} = parse_binary_data(Rest1), {Payload, Rest2} = parse_binary_data(Rest1),
ok = ensure_topic_name_valid(StrictMode, Topic, Props),
{Packet#mqtt_packet_connect{will_props = Props, {Packet#mqtt_packet_connect{will_props = Props,
will_topic = Topic, will_topic = Topic,
will_payload = Payload will_payload = Payload
@ -526,15 +524,6 @@ parse_binary_data(Bin)
when 2 > byte_size(Bin) -> when 2 > byte_size(Bin) ->
error(malformed_binary_data_length). error(malformed_binary_data_length).
ensure_topic_name_valid(false, _TopicName, _Properties) ->
ok;
ensure_topic_name_valid(true, TopicName, _Properties) when TopicName =/= <<>> ->
ok;
ensure_topic_name_valid(true, <<>>, #{'Topic-Alias' := _}) ->
ok;
ensure_topic_name_valid(true, <<>>, _) ->
error(empty_topic_name).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Serialize MQTT Packet %% Serialize MQTT Packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -46,8 +46,6 @@ groups() ->
t_parse_frame_malformed_variable_byte_integer, t_parse_frame_malformed_variable_byte_integer,
t_parse_frame_variable_byte_integer, t_parse_frame_variable_byte_integer,
t_parse_malformed_utf8_string, t_parse_malformed_utf8_string,
t_parse_empty_topic_name,
t_parse_empty_topic_name_with_alias,
t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet. t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet.
]}, ]},
{connect, [parallel], {connect, [parallel],
@ -164,21 +162,6 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
t_parse_empty_topic_name(_) ->
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<>>),
?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})),
?catch_error(empty_topic_name, parse_serialize(Packet, #{strict_mode => true})).
t_parse_empty_topic_name_with_alias(_) ->
Props = #{'Topic-Alias' => 16#AB},
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, Props, <<>>),
?assertEqual(
Packet, parse_serialize(Packet, #{strict_mode => false, version => ?MQTT_PROTO_V5})
),
?assertEqual(
Packet, parse_serialize(Packet, #{strict_mode => true, version => ?MQTT_PROTO_V5})
).
t_parse_frame_proxy_protocol(_) -> t_parse_frame_proxy_protocol(_) ->
BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">>
, <<"\r\n\r\n\0\r\nQUIT\n">>], , <<"\r\n\r\n\0\r\nQUIT\n">>],