Merge develop to X
This commit is contained in:
commit
e52c303014
|
@ -152,6 +152,7 @@ monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) ->
|
||||||
State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}.
|
State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}.
|
||||||
|
|
||||||
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
||||||
|
erlang:demonitor(MRef, [flush]),
|
||||||
State#state{monitors = dict:erase(MRef, Monitors)}.
|
State#state{monitors = dict:erase(MRef, Monitors)}.
|
||||||
|
|
||||||
setstats(State = #state{statsfun = StatsFun}) ->
|
setstats(State = #state{statsfun = StatsFun}) ->
|
||||||
|
|
|
@ -311,5 +311,6 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
|
||||||
State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
|
State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
|
||||||
|
|
||||||
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
erase_monitor(MRef, State = #state{monitors = Monitors}) ->
|
||||||
|
erlang:demonitor(MRef, [flush]),
|
||||||
State#state{monitors = dict:erase(MRef, Monitors)}.
|
State#state{monitors = dict:erase(MRef, Monitors)}.
|
||||||
|
|
||||||
|
|
|
@ -91,23 +91,30 @@ clean_acl_cache(CPid, Topic) ->
|
||||||
|
|
||||||
init([Env, WsPid, Req, ReplyChannel]) ->
|
init([Env, WsPid, Req, ReplyChannel]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = link(WsPid),
|
|
||||||
{ok, Peername} = Req:get(peername),
|
|
||||||
Headers = mochiweb_headers:to_list(
|
|
||||||
mochiweb_request:get(headers, Req)),
|
|
||||||
Conn = Req:get(connection),
|
Conn = Req:get(connection),
|
||||||
ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel),
|
true = link(WsPid),
|
||||||
[{ws_initial_headers, Headers} | Env]),
|
case Req:get(peername) of
|
||||||
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
|
{ok, Peername} ->
|
||||||
EnableStats = get_value(client_enable_stats, Env, false),
|
Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)),
|
||||||
ForceGcCount = emqx_gc:conn_max_gc_count(),
|
ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel),
|
||||||
{ok, #wsclient_state{connection = Conn,
|
[{ws_initial_headers, Headers} | Env]),
|
||||||
ws_pid = WsPid,
|
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
|
||||||
peername = Peername,
|
EnableStats = get_value(client_enable_stats, Env, false),
|
||||||
proto_state = ProtoState,
|
ForceGcCount = emqx_gc:conn_max_gc_count(),
|
||||||
enable_stats = EnableStats,
|
{ok, #wsclient_state{connection = Conn,
|
||||||
force_gc_count = ForceGcCount},
|
ws_pid = WsPid,
|
||||||
IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}.
|
peername = Peername,
|
||||||
|
proto_state = ProtoState,
|
||||||
|
enable_stats = EnableStats,
|
||||||
|
force_gc_count = ForceGcCount},
|
||||||
|
IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE};
|
||||||
|
{error, enotconn} -> Conn:fast_close(),
|
||||||
|
exit(WsPid, normal),
|
||||||
|
exit(normal);
|
||||||
|
{error, Reason} -> Conn:fast_close(),
|
||||||
|
exit(WsPid, normal),
|
||||||
|
exit({shutdown, Reason})
|
||||||
|
end.
|
||||||
|
|
||||||
prioritise_call(Msg, _From, _Len, _State) ->
|
prioritise_call(Msg, _From, _Len, _State) ->
|
||||||
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
|
case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
|
||||||
|
@ -207,6 +214,9 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||||
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
|
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
|
||||||
shutdown(conflict, State);
|
shutdown(conflict, State);
|
||||||
|
|
||||||
|
handle_info({shutdown, Reason}, State) ->
|
||||||
|
shutdown(Reason, State);
|
||||||
|
|
||||||
handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
|
handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
|
||||||
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
||||||
case emqx_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of
|
case emqx_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of
|
||||||
|
|
Loading…
Reference in New Issue