Fixed conflicts
This commit is contained in:
commit
666d9706a3
|
@ -779,7 +779,7 @@ listener.tcp.internal.acceptors = 4
|
||||||
## Maximum number of concurrent MQTT/TCP connections.
|
## Maximum number of concurrent MQTT/TCP connections.
|
||||||
##
|
##
|
||||||
## Value: Number
|
## Value: Number
|
||||||
listener.tcp.internal.max_connections = 10240000
|
listener.tcp.internal.max_connections = 1024000
|
||||||
|
|
||||||
## Maximum internal connections per second.
|
## Maximum internal connections per second.
|
||||||
##
|
##
|
||||||
|
|
|
@ -34,16 +34,21 @@
|
||||||
-export([match/3]).
|
-export([match/3]).
|
||||||
|
|
||||||
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
|
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
|
||||||
|
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))).
|
||||||
|
|
||||||
%% @doc Compile Access Rule.
|
%% @doc Compile Access Rule.
|
||||||
compile({A, all}) when ?ALLOW_DENY(A) ->
|
compile({A, all}) when ?ALLOW_DENY(A) ->
|
||||||
{A, all};
|
{A, all};
|
||||||
|
|
||||||
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), is_binary(Topic) ->
|
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), ?PUBSUB(Access), is_binary(Topic) ->
|
||||||
{A, compile(who, Who), Access, [compile(topic, Topic)]};
|
{A, compile(who, Who), Access, [compile(topic, Topic)]};
|
||||||
|
|
||||||
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
|
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A), ?PUBSUB(Access) ->
|
||||||
{A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}.
|
{A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]};
|
||||||
|
|
||||||
|
compile(Rule) ->
|
||||||
|
emqx_logger:error("[ACCESS_RULE] Malformed rule: ~p", [Rule]),
|
||||||
|
{error, bad_rule}.
|
||||||
|
|
||||||
compile(who, all) ->
|
compile(who, all) ->
|
||||||
all;
|
all;
|
||||||
|
|
|
@ -45,19 +45,27 @@ all_rules() ->
|
||||||
|
|
||||||
-spec(init([File :: string()]) -> {ok, #{}}).
|
-spec(init([File :: string()]) -> {ok, #{}}).
|
||||||
init([File]) ->
|
init([File]) ->
|
||||||
ok = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
||||||
true = load_rules_from_file(File),
|
ok = load_rules_from_file(File),
|
||||||
{ok, #{acl_file => File}}.
|
{ok, #{acl_file => File}}.
|
||||||
|
|
||||||
load_rules_from_file(AclFile) ->
|
load_rules_from_file(AclFile) ->
|
||||||
{ok, Terms} = file:consult(AclFile),
|
case file:consult(AclFile) of
|
||||||
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
{ok, Terms} ->
|
||||||
lists:foreach(fun(PubSub) ->
|
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
||||||
ets:insert(?ACL_RULE_TAB, {PubSub,
|
lists:foreach(fun(PubSub) ->
|
||||||
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
ets:insert(?ACL_RULE_TAB, {PubSub,
|
||||||
end, [publish, subscribe]),
|
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
||||||
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}).
|
end, [publish, subscribe]),
|
||||||
|
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
filter(_PubSub, {error, _}) ->
|
||||||
|
false;
|
||||||
filter(_PubSub, {allow, all}) ->
|
filter(_PubSub, {allow, all}) ->
|
||||||
true;
|
true;
|
||||||
filter(_PubSub, {deny, all}) ->
|
filter(_PubSub, {deny, all}) ->
|
||||||
|
@ -100,9 +108,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
|
||||||
-spec(reload_acl(state()) -> ok | {error, term()}).
|
-spec(reload_acl(state()) -> ok | {error, term()}).
|
||||||
reload_acl(#{acl_file := AclFile}) ->
|
reload_acl(#{acl_file := AclFile}) ->
|
||||||
case catch load_rules_from_file(AclFile) of
|
case catch load_rules_from_file(AclFile) of
|
||||||
true ->
|
ok ->
|
||||||
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
|
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
|
||||||
ok;
|
ok;
|
||||||
|
{error, Error} ->
|
||||||
|
{error, Error};
|
||||||
{'EXIT', Error} ->
|
{'EXIT', Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -104,7 +104,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}.
|
||||||
-else.
|
-else.
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
|
||||||
|
|
|
@ -620,14 +620,12 @@ maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
|
||||||
ClientId;
|
ClientId;
|
||||||
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
|
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
|
||||||
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
||||||
true ->
|
true -> Username;
|
||||||
Username;
|
false -> ClientId
|
||||||
false ->
|
|
||||||
ClientId
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Assign a clientid
|
%% Assign a clientId
|
||||||
|
|
||||||
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
|
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
|
||||||
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
|
@ -651,41 +649,30 @@ try_open_session(PState = #pstate{zone = Zone,
|
||||||
clean_start => CleanStart,
|
clean_start => CleanStart,
|
||||||
will_msg => WillMsg
|
will_msg => WillMsg
|
||||||
},
|
},
|
||||||
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
|
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]),
|
||||||
case emqx_sm:open_session(SessAttrs1) of
|
case emqx_sm:open_session(SessAttrs1) of
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
{ok, SPid, false};
|
{ok, SPid, false};
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
maps:put(max_inflight, if
|
maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs);
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
|
||||||
get_property('Receive-Maximum', ConnProps, 65535);
|
|
||||||
true ->
|
|
||||||
emqx_zone:get_env(Zone, max_inflight, 65535)
|
|
||||||
end, SessAttrs);
|
|
||||||
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
|
|
||||||
maps:put(expiry_interval, if
|
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
|
||||||
get_property('Session-Expiry-Interval', ConnProps, 0);
|
|
||||||
true ->
|
|
||||||
case CleanStart of
|
|
||||||
true -> 0;
|
|
||||||
false ->
|
|
||||||
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
|
||||||
end
|
|
||||||
end, SessAttrs);
|
|
||||||
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
|
||||||
maps:put(topic_alias_maximum, if
|
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
|
||||||
get_property('Topic-Alias-Maximum', ConnProps, 0);
|
|
||||||
true ->
|
|
||||||
emqx_zone:get_env(Zone, max_topic_alias, 0)
|
|
||||||
end, SessAttrs);
|
|
||||||
set_session_attrs({_, #pstate{}}, SessAttrs) ->
|
|
||||||
SessAttrs.
|
|
||||||
|
|
||||||
|
set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) ->
|
||||||
|
maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs);
|
||||||
|
|
||||||
|
set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
|
maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs);
|
||||||
|
|
||||||
|
set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) ->
|
||||||
|
maps:put(expiry_interval, case CleanStart of
|
||||||
|
true -> 0;
|
||||||
|
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
||||||
|
end, SessAttrs);
|
||||||
|
|
||||||
|
set_session_attrs(_, SessAttrs) ->
|
||||||
|
SessAttrs.
|
||||||
|
|
||||||
authenticate(Credentials, Password) ->
|
authenticate(Credentials, Password) ->
|
||||||
case emqx_access_control:authenticate(Credentials, Password) of
|
case emqx_access_control:authenticate(Credentials, Password) of
|
||||||
|
|
|
@ -147,8 +147,6 @@
|
||||||
%% Created at
|
%% Created at
|
||||||
created_at :: erlang:timestamp(),
|
created_at :: erlang:timestamp(),
|
||||||
|
|
||||||
topic_alias_maximum :: pos_integer(),
|
|
||||||
|
|
||||||
will_msg :: emqx:message(),
|
will_msg :: emqx:message(),
|
||||||
|
|
||||||
will_delay_timer :: reference() | undefined
|
will_delay_timer :: reference() | undefined
|
||||||
|
@ -341,7 +339,6 @@ init([Parent, #{zone := Zone,
|
||||||
clean_start := CleanStart,
|
clean_start := CleanStart,
|
||||||
expiry_interval := ExpiryInterval,
|
expiry_interval := ExpiryInterval,
|
||||||
max_inflight := MaxInflight,
|
max_inflight := MaxInflight,
|
||||||
topic_alias_maximum := TopicAliasMaximum,
|
|
||||||
will_msg := WillMsg}]) ->
|
will_msg := WillMsg}]) ->
|
||||||
emqx_logger:set_metadata_client_id(ClientId),
|
emqx_logger:set_metadata_client_id(ClientId),
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
|
@ -367,7 +364,6 @@ init([Parent, #{zone := Zone,
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
created_at = os:timestamp(),
|
created_at = os:timestamp(),
|
||||||
topic_alias_maximum = TopicAliasMaximum,
|
|
||||||
will_msg = WillMsg
|
will_msg = WillMsg
|
||||||
},
|
},
|
||||||
ok = emqx_sm:register_session(ClientId, self()),
|
ok = emqx_sm:register_session(ClientId, self()),
|
||||||
|
@ -519,22 +515,23 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% RESUME:
|
%% RESUME:
|
||||||
handle_cast({resume, #{conn_pid := ConnPid,
|
handle_cast({resume, #{conn_pid := ConnPid,
|
||||||
will_msg := WillMsg,
|
will_msg := WillMsg,
|
||||||
expiry_interval := SessionExpiryInterval,
|
expiry_interval := ExpiryInterval,
|
||||||
max_inflight := MaxInflight,
|
max_inflight := MaxInflight}},
|
||||||
topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId,
|
State = #state{client_id = ClientId,
|
||||||
conn_pid = OldConnPid,
|
conn_pid = OldConnPid,
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
retry_timer = RetryTimer,
|
retry_timer = RetryTimer,
|
||||||
await_rel_timer = AwaitTimer,
|
await_rel_timer = AwaitTimer,
|
||||||
expiry_timer = ExpireTimer,
|
expiry_timer = ExpireTimer,
|
||||||
will_delay_timer = WillDelayTimer}) ->
|
will_delay_timer = WillDelayTimer}) ->
|
||||||
|
|
||||||
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
|
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
|
||||||
|
|
||||||
%% Cancel Timers
|
%% Cancel Timers
|
||||||
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
lists:foreach(fun emqx_misc:cancel_timer/1,
|
||||||
|
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
||||||
|
|
||||||
case kick(ClientId, OldConnPid, ConnPid) of
|
case kick(ClientId, OldConnPid, ConnPid) of
|
||||||
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||||
|
@ -543,19 +540,18 @@ handle_cast({resume, #{conn_pid := ConnPid,
|
||||||
|
|
||||||
true = link(ConnPid),
|
true = link(ConnPid),
|
||||||
|
|
||||||
State1 = State#state{conn_pid = ConnPid,
|
State1 = State#state{conn_pid = ConnPid,
|
||||||
binding = binding(ConnPid),
|
binding = binding(ConnPid),
|
||||||
old_conn_pid = OldConnPid,
|
old_conn_pid = OldConnPid,
|
||||||
clean_start = false,
|
clean_start = false,
|
||||||
retry_timer = undefined,
|
retry_timer = undefined,
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
await_rel_timer = undefined,
|
await_rel_timer = undefined,
|
||||||
expiry_timer = undefined,
|
expiry_timer = undefined,
|
||||||
expiry_interval = SessionExpiryInterval,
|
expiry_interval = ExpiryInterval,
|
||||||
inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
||||||
topic_alias_maximum = TopicAliasMaximum,
|
will_delay_timer = undefined,
|
||||||
will_delay_timer = undefined,
|
will_msg = WillMsg},
|
||||||
will_msg = WillMsg},
|
|
||||||
|
|
||||||
%% Clean Session: true -> false???
|
%% Clean Session: true -> false???
|
||||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||||
|
@ -574,9 +570,10 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
%% Batch dispatch
|
%% Batch dispatch
|
||||||
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
||||||
{noreply, lists:foldl(fun(Msg, NewState) ->
|
noreply(lists:foldl(
|
||||||
element(2, handle_info({dispatch, Topic, Msg}, NewState))
|
fun(Msg, St) ->
|
||||||
end, State, Msgs)};
|
element(2, handle_info({dispatch, Topic, Msg}, St))
|
||||||
|
end, State, Msgs));
|
||||||
|
|
||||||
%% Dispatch message
|
%% Dispatch message
|
||||||
handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
||||||
|
@ -585,12 +582,11 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
||||||
%% Require ack, but we do not have connection
|
%% Require ack, but we do not have connection
|
||||||
%% negative ack the message so it can try the next subscriber in the group
|
%% negative ack the message so it can try the next subscriber in the group
|
||||||
ok = emqx_shared_sub:nack_no_connection(Msg),
|
ok = emqx_shared_sub:nack_no_connection(Msg),
|
||||||
noreply(State);
|
{noreply, State};
|
||||||
false ->
|
false ->
|
||||||
handle_dispatch(Topic, Msg, State)
|
noreply(handle_dispatch(Topic, Msg, State))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|
||||||
%% Do nothing if the client has been disconnected.
|
%% Do nothing if the client has been disconnected.
|
||||||
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
||||||
noreply(State#state{retry_timer = undefined});
|
noreply(State#state{retry_timer = undefined});
|
||||||
|
@ -678,23 +674,14 @@ maybe_shutdown(Pid, Reason) ->
|
||||||
has_connection(#state{conn_pid = Pid}) ->
|
has_connection(#state{conn_pid = Pid}) ->
|
||||||
is_pid(Pid) andalso is_process_alive(Pid).
|
is_pid(Pid) andalso is_process_alive(Pid).
|
||||||
|
|
||||||
handle_dispatch(Topic, Msg = #message{headers = Headers},
|
handle_dispatch(Topic, Msg, State = #state{subscriptions = SubMap}) ->
|
||||||
State = #state{subscriptions = SubMap,
|
case maps:find(Topic, SubMap) of
|
||||||
topic_alias_maximum = TopicAliasMaximum
|
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
||||||
}) ->
|
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
||||||
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
|
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
||||||
if
|
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
||||||
TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum ->
|
error ->
|
||||||
noreply(case maps:find(Topic, SubMap) of
|
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||||
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
|
||||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
|
||||||
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
|
||||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
|
||||||
error ->
|
|
||||||
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
|
||||||
end);
|
|
||||||
true ->
|
|
||||||
noreply(State)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
suback(_From, undefined, _ReasonCodes) ->
|
suback(_From, undefined, _ReasonCodes) ->
|
||||||
|
|
|
@ -355,7 +355,8 @@ compile_rule(_) ->
|
||||||
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
||||||
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
|
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
|
||||||
{allow, all} = compile({allow, all}),
|
{allow, all} = compile({allow, all}),
|
||||||
{deny, all} = compile({deny, all}).
|
{deny, all} = compile({deny, all}),
|
||||||
|
{error, bad_rule} = compile({test, malformed}).
|
||||||
|
|
||||||
match_rule(_) ->
|
match_rule(_) ->
|
||||||
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
|
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
|
||||||
|
|
Loading…
Reference in New Issue