Merge branch 'emqx30' into emqx30-feng

This commit is contained in:
Feng Lee 2018-08-30 09:45:29 +08:00
commit da1285ad3c
11 changed files with 237 additions and 271 deletions

View File

@ -24,8 +24,8 @@ ERLC_OPTS += +'{parse_transform, lager_transform}'
BUILD_DEPS = cuttlefish
dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
TEST_DEPS = emqx_ct_helplers
dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
#TEST_DEPS = emqx_ct_helplers
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
TEST_ERLC_OPTS += +debug_info
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'

View File

@ -20,7 +20,9 @@
-include("emqx_mqtt.hrl").
-export([start_link/3]).
-export([info/1, stats/1, kick/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([kick/1]).
-export([session/1]).
%% gen_server callbacks
@ -49,7 +51,7 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level("Client(~s): " ++ Format,
emqx_logger:Level("MQTT(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
start_link(Transport, Socket, Options) ->
@ -59,17 +61,58 @@ start_link(Transport, Socket, Options) ->
%% API
%%------------------------------------------------------------------------------
info(CPid) ->
call(CPid, info).
%% for debug
info(CPid) when is_pid(CPid) ->
call(CPid, info);
stats(CPid) ->
call(CPid, stats).
info(#state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
conn_state = ConnState,
await_recv = AwaitRecv,
rate_limit = RateLimit,
publish_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{await_recv, AwaitRecv},
{rate_limit, esockd_rate_limit:info(RateLimit)},
{publish_limit, esockd_rate_limit:info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)).
kick(CPid) ->
call(CPid, kick).
%% for dashboard
attrs(CPid) when is_pid(CPid) ->
call(CPid, attrs);
session(CPid) ->
call(CPid, session).
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
%% Conn stats
stats(CPid) when is_pid(CPid) ->
call(CPid, stats);
stats(#state{transport = Transport,
socket = Socket,
proto_state = ProtoState}) ->
lists:append([emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState),
case Transport:getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end]).
kick(CPid) -> call(CPid, kick).
session(CPid) -> call(CPid, session).
call(CPid, Req) ->
gen_server:call(CPid, Req, infinity).
@ -131,38 +174,17 @@ send_fun(Transport, Socket, Peername) ->
end
end.
handle_call(info, _From, State = #state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
conn_state = ConnState,
await_recv = AwaitRecv,
rate_limit = RateLimit,
publish_limit = PubLimit,
proto_state = ProtoState}) ->
ConnInfo = [{socktype, Transport:type(Socket)},
{peername, Peername},
{sockname, Sockname},
{conn_state, ConnState},
{await_recv, AwaitRecv},
{rate_limit, esockd_rate_limit:info(RateLimit)},
{publish_limit, esockd_rate_limit:info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState),
{reply, lists:usort(lists:append([ConnInfo, ProtoInfo])), State};
handle_call(info, _From, State) ->
{reply, info(State), State};
handle_call(stats, _From, State = #state{transport = Transport,
socket = Socket,
proto_state = ProtoState}) ->
ProcStats = emqx_misc:proc_stats(),
ProtoStats = emqx_protocol:stats(ProtoState),
SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end,
{reply, lists:append([ProcStats, ProtoStats, SockStats]), State};
handle_call(attrs, _From, State) ->
{reply, attrs(State), State};
handle_call(stats, _From, State) ->
{reply, stats(State), State};
handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
{stop, {shutdown, kicked}, ok, State};
handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
{reply, emqx_protocol:session(ProtoState), State};
@ -186,8 +208,7 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
end;
handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
Stats = element(2, handle_call(stats, undefined, State)),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
handle_info(timeout, State) ->

View File

@ -47,7 +47,7 @@ banned(ClientId) ->
%%--------------------------------------------------------------------
init([]) ->
_ = ets:new(banned, [public, ordered_set, named_table]),
%% ets:new(banned, [public, ordered_set, named_table]),
{ok, #state{}}.
handle_call(_Request, _From, State) ->

View File

@ -1,68 +0,0 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_flow_control).
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Starts the server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -19,8 +19,6 @@
%% Memory: (10, 100, 1000)
%%
%%-record
-export([conn_max_gc_count/0, reset_conn_gc_count/2, maybe_force_gc/2,
maybe_force_gc/3]).

View File

@ -28,8 +28,8 @@
load(Topics) ->
emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
on_session_created(#{client_id := ClientId}, SessInfo, Topics) ->
Username = proplists:get_value(username, SessInfo),
on_session_created(#{client_id := ClientId}, SessAttrs, Topics) ->
Username = proplists:get_value(username, SessAttrs),
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,

View File

@ -17,7 +17,11 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([init/2, info/1, caps/1, stats/1]).
-export([init/2]).
-export([info/1]).
-export([attrs/1]).
-export([caps/1]).
-export([stats/1]).
-export([client_id/1]).
-export([credentials/1]).
-export([parser/1]).
@ -28,22 +32,6 @@
-export([send/2]).
-export([shutdown/2]).
%%-record(mqtt_client, {
%% client_id :: binary() | undefined,
%% client_pid :: pid(),
%% username :: binary() | undefined,
%% peername :: {inet:ip_address(), inet:port_number()},
%% clean_start :: boolean(),
%% proto_ver :: emqx_mqtt_types:version(),
%% keepalive = 0 :: non_neg_integer(),
%% will_topic :: undefined | binary(),
%% mountpoint :: undefined | binary(),
%% connected_at :: erlang:timestamp(),
%% attributes :: map()
%% }).
-record(pstate, {
zone,
sendfun,
@ -61,6 +49,7 @@
clean_start,
topic_aliases,
packet_size,
will_topic,
will_msg,
keepalive,
mountpoint,
@ -81,7 +70,7 @@
-endif.
-define(LOG(Level, Format, Args, PState),
emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format,
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
%%------------------------------------------------------------------------------
@ -127,33 +116,46 @@ set_username(_Username, PState) ->
%% API
%%------------------------------------------------------------------------------
info(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
conn_props = ConnProps,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected = Connected,
connected_at = ConnectedAt}) ->
info(PState = #pstate{conn_props = ConnProps,
ack_props = AclProps,
session = Session,
topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AclProps},
{session, Session},
{topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}].
attrs(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
peercert = Peercert,
clean_start = CleanStart,
proto_ver = ProtoVer,
proto_name = ProtoName,
keepalive = Keepalive,
will_topic = WillTopic,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected_at = ConnectedAt}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
{peername, Peername},
{peercert, Peercert},
{proto_ver, ProtoVer},
{proto_name, ProtoName},
{conn_props, ConnProps},
{clean_start, CleanStart},
{keepalive, Keepalive},
{will_topic, WillTopic},
{mountpoint, Mountpoint},
{is_super, IsSuper},
{is_bridge, IsBridge},
{connected, Connected},
{connected_at, ConnectedAt}].
caps(#pstate{zone = Zone}) ->
@ -254,6 +256,7 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
properties = ConnProps,
will_topic = WillTopic,
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
@ -269,9 +272,9 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_topic = WillTopic,
will_msg = WillMsg,
is_bridge = IsBridge,
connected = true,
connected_at = os:timestamp()}),
connack(
@ -284,8 +287,8 @@ process_packet(?CONNECT_PACKET(
%% Open session
case try_open_session(PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid},
ok = emqx_cm:register_connection(client_id(PState4), info(PState4)),
PState4 = PState3#pstate{session = SPid, connected = true},
ok = emqx_cm:register_connection(client_id(PState4), attrs(PState4)),
%% Start keepalive
start_keepalive(Keepalive, PState4),
%% Success
@ -521,7 +524,8 @@ try_open_session(#pstate{zone = Zone,
username => Username,
clean_start => CleanStart,
conn_props => ConnProps}) of
{ok, SPid} -> {ok, SPid, false};
{ok, SPid} ->
{ok, SPid, false};
Other -> Other
end.

View File

@ -44,7 +44,8 @@
-include("emqx_mqtt.hrl").
-export([start_link/1]).
-export([info/1, stats/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([resume/2, discard/2]).
-export([subscribe/2, subscribe/4]).
-export([publish/3]).
@ -94,8 +95,8 @@
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
inflight :: emqx_inflight:inflight(),
%% Max Inflight Size
max_inflight = 32 :: non_neg_integer(),
%% Max Inflight Size. DEPRECATED: Get from inflight
%% max_inflight = 32 :: non_neg_integer(),
%% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: timeout(),
@ -145,11 +146,6 @@
-define(TIMEOUT, 60000).
-define(INFO_KEYS, [clean_start, client_id, username, binding, conn_pid, old_conn_pid,
next_pkt_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
await_rel_timeout, expiry_interval, enable_stats, created_at]).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level([{client, State#state.client_id}],
"Session(~s): " ++ Format, [State#state.client_id | Args])).
@ -160,6 +156,77 @@ start_link(SessAttrs) ->
IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000),
gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list({atom(), term()})).
info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info, infinity);
info(State = #state{conn_pid = ConnPid,
next_pkt_id = PktId,
max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
upgrade_qos = UpgradeQoS,
inflight = Inflight,
retry_interval = RetryInterval,
mqueue = MQueue,
awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxAwaitingRel,
await_rel_timeout = AwaitRelTimeout}) ->
attrs(State) ++ [{conn_pid, ConnPid},
{next_pkt_id, PktId},
{max_subscriptions, MaxSubscriptions},
{subscriptions, Subscriptions},
{upgrade_qos, UpgradeQoS},
{inflight, Inflight},
{retry_interval, RetryInterval},
{mqueue_len, MQueue},
{awaiting_rel, AwaitingRel},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout}].
%% @doc Get session attrs
-spec(attrs(pid() | #state{}) -> list({atom(), term()})).
attrs(SPid) when is_pid(SPid) ->
gen_server:call(SPid, attrs, infinity);
attrs(#state{clean_start = CleanStart,
binding = Binding,
client_id = ClientId,
username = Username,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
[{clean_start, CleanStart},
{binding, Binding},
{client_id, ClientId},
{username, Username},
{expiry_interval, ExpiryInterval div 1000},
{created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
inflight = Inflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel,
deliver_stats = DeliverMsg,
enqueue_stats = EnqueueMsg}) ->
lists:append(emqx_misc:proc_stats(),
[{max_subscriptions, MaxSubscriptions},
{subscriptions_num, maps:size(Subscriptions)},
{max_inflight, emqx_inflight:max_size(Inflight)},
{inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_len, emqx_mqueue:len(MQueue)},
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, DeliverMsg},
{enqueue_msg, EnqueueMsg}]).
%%------------------------------------------------------------------------------
%% PubSub API
%%------------------------------------------------------------------------------
@ -229,70 +296,6 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())).
info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info);
info(#state{clean_start = CleanStart,
binding = Binding,
client_id = ClientId,
username = Username,
max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
upgrade_qos = UpgradeQoS,
inflight = Inflight,
max_inflight = MaxInflight,
retry_interval = RetryInterval,
mqueue = MQueue,
awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxAwaitingRel,
await_rel_timeout = AwaitRelTimeout,
expiry_interval = ExpiryInterval,
created_at = CreatedAt}) ->
[{clean_start, CleanStart},
{binding, Binding},
{client_id, ClientId},
{username, Username},
{max_subscriptions, MaxSubscriptions},
{subscriptions, maps:size(Subscriptions)},
{upgrade_qos, UpgradeQoS},
{inflight, emqx_inflight:size(Inflight)},
{max_inflight, MaxInflight},
{retry_interval, RetryInterval},
{mqueue_len, emqx_mqueue:len(MQueue)},
{awaiting_rel, maps:size(AwaitingRel)},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout},
{expiry_interval, ExpiryInterval},
{created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
inflight = Inflight,
max_inflight = MaxInflight,
mqueue = MQueue,
max_awaiting_rel = MaxAwaitingRel,
awaiting_rel = AwaitingRel,
deliver_stats = DeliverMsg,
enqueue_stats = EnqueueMsg}) ->
lists:append(emqx_misc:proc_stats(),
[{max_subscriptions, MaxSubscriptions},
{subscriptions, maps:size(Subscriptions)},
{max_inflight, MaxInflight},
{inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, emqx_mqueue:max_len(MQueue)},
{mqueue_len, emqx_mqueue:len(MQueue)},
{mqueue_dropped, emqx_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, DeliverMsg},
{enqueue_msg, EnqueueMsg}]).
%% @doc Discard the session
-spec(discard(pid(), emqx_types:client_id()) -> ok).
discard(SPid, ClientId) ->
@ -311,7 +314,7 @@ init(#{zone := Zone,
username := Username,
conn_pid := ConnPid,
clean_start := CleanStart,
conn_props := _ConnProps}) ->
conn_props := ConnProps}) ->
process_flag(trap_exit, true),
true = link(ConnPid),
MaxInflight = get_env(Zone, max_inflight),
@ -323,21 +326,26 @@ init(#{zone := Zone,
subscriptions = #{},
max_subscriptions = get_env(Zone, max_subscriptions, 0),
upgrade_qos = get_env(Zone, upgrade_qos, false),
max_inflight = MaxInflight,
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone, ClientId),
retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{},
await_rel_timeout = get_env(Zone, await_rel_timeout),
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = get_env(Zone, session_expiry_interval),
expiry_interval = expire_interval(Zone, ConnProps),
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp()},
emqx_sm:register_session(ClientId, info(State)),
emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]),
emqx_sm:set_session_stats(ClientId, stats(State)),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
{ok, ensure_stats_timer(State), hibernate}.
{ok, State}.
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
I * 1000;
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
get_env(Zone, session_expiry_interval, 0).
init_mqueue(Zone, ClientId) ->
emqx_mqueue:new(ClientId, #{type => simple,
@ -399,6 +407,9 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From,
handle_call(info, _From, State) ->
reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
@ -501,7 +512,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
%% Clean Session: true -> false?
CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, info(State)]),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
%% Replay delivery and Dequeue pending messages
{noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
@ -541,20 +552,18 @@ handle_info({timeout, _Timer, expired}, State) ->
?LOG(info, "Expired, shutdown now.", [], State),
shutdown(expired, State);
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start= true, conn_pid = ConnPid}) ->
{stop, normal, State};
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
{stop, Reason, State};
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = false,
conn_pid = ConnPid,
expiry_interval = Interval}) ->
?LOG(info, "Connection ~p EXIT for ~p", [ConnPid, Reason], State),
ExpireTimer = emqx_misc:start_timer(Interval, expired),
State1 = State#state{conn_pid = undefined, expiry_timer = ExpireTimer},
{noreply, State1};
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
{stop, Reason, State};
handle_info({'EXIT', Pid, _Reason}, State = #state{old_conn_pid = Pid}) ->
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
%% ignore
{noreply, State, hibernate};
{noreply, State#state{old_conn_pid = undefined}, hibernate};
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
@ -571,6 +580,7 @@ handle_info(Info, State) ->
terminate(Reason, #state{client_id = ClientId}) ->
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
%%TODO: notify conn_pid to shutdown?
emqx_sm:unregister_session(ClientId).
code_change(_OldVsn, State, _Extra) ->
@ -819,6 +829,11 @@ ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_tim
ensure_await_rel_timer(State) ->
State.
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
ensure_expire_timer(State) ->
State.
%%------------------------------------------------------------------------------
%% Reset Dup
@ -837,8 +852,7 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
%%------------------------------------------------------------------------------
%% Ensure stats timer
ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined}) ->
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) ->
State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)};
ensure_stats_timer(State) ->
State.
@ -857,5 +871,5 @@ reply(Reply, State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
%%TODO: maybe_gc(State) -> State.
%% TODO: maybe_gc(State) -> State.

View File

@ -139,18 +139,18 @@ unregister_mod(_) ->
[] = ?AC:lookup_mods(auth).
check_acl_1(_) ->
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
check_acl_2(_) ->
SelfUser = #client{id = <<"client2">>, username = <<"xyz">>},
SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>},
deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
acl_cache_basic(_) ->
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
@ -163,8 +163,7 @@ acl_cache_basic(_) ->
acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
@ -174,7 +173,7 @@ acl_cache_expiry(_) ->
acl_cache_full(_) ->
application:set_env(emqx, acl_cache_max_size, 1),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -189,7 +188,7 @@ acl_cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #client{id = <<"client1">>, username = <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -334,7 +333,6 @@ cache_auto_cleanup(_) ->
%%--------------------------------------------------------------------
compile_rule(_) ->
{allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
@ -360,8 +358,8 @@ compile_rule(_) ->
{deny, all} = compile({deny, all}).
match_rule(_) ->
User = #client{peername = {{127,0,0,1}, 2948}, id = <<"testClient">>, username = <<"TestUser">>},
User2 = #client{peername = {{192,168,0,10}, 3028}, id = <<"testClient">>, username = <<"TestUser">>},
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}},
{matched, allow} = match(User, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(User, <<"Test/Topic">>, {deny, all}),
@ -371,8 +369,7 @@ match_rule(_) ->
nomatch = match(User, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
{matched, allow} = match(User, <<"testTopics/testClient">>, compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
{matched, allow} = match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
{matched, allow} = match(#client{username = <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/%u/#"]})),
{matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>, compile({allow, all, subscribe, ["users/%u/#"]})),
{matched, deny} = match(User, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
nomatch = match(User, <<"Topic">>, Rule),
@ -380,3 +377,4 @@ match_rule(_) ->
{matched, allow} = match(User, <<"Topic">>, AndRule),
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
{matched, allow} = match(User, <<"Topic">>, OrRule).

View File

@ -36,15 +36,12 @@ stop(CPid) ->
gen_server:call(CPid, stop).
init([ClientId]) ->
{ok,
#state{clean_start = true,
client_id = ClientId}
}.
{ok, #state{clean_start = true, client_id = ClientId}}.
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
Attrs = #{ zone => Zone,
client_id => ClientId,
client_pid => ClientPid,
conn_pid => ClientPid,
clean_start => true,
username => undefined,
conn_props => undefined
@ -52,7 +49,7 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
{ok, SessPid} = emqx_sm:open_session(Attrs),
{reply, {ok, SessPid}, State#state{
clean_start = true,
client_id = ClientId,
client_id = ClientId,
client_pid = ClientPid
}};

View File

@ -14,22 +14,23 @@
-module(emqx_sm_SUITE).
-include("emqx.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
all() -> [t_open_close_session].
t_open_close_session(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true, client_id => <<"client">>, client_pid => ClientPid, zone => internal, username => <<"zhou">>, conn_props => ref},
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
zone => internal, username => <<"zhou">>, conn_props => #{}},
{ok, _SPid} = emqx_sm:open_session(Attrs),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SPid = emqx_sm:lookup_session_pid(<<"client">>),
{ok, NewClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid, true} = emqx_sm:open_session(Attrs#{clean_start => false, client_pid => NewClientPid}),
{ok, NewConnPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid, true} = emqx_sm:open_session(Attrs#{clean_start => false, conn_pid => NewConnPid}),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SAttrs = emqx_sm:get_session_attrs({<<"client">>, SPid}),
<<"client">> = proplists:get_value(client_id, SAttrs),
@ -38,3 +39,4 @@ t_open_close_session(_) ->
{open, true} = emqx_sm:get_session_stats(Session),
ok = emqx_sm:close_session(SPid),
[] = emqx_sm:lookup_session(<<"client">>).