diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index cc73155ba..19bbc1cf2 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -152,6 +152,7 @@ monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) -> State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}. erase_monitor(MRef, State = #state{monitors = Monitors}) -> + erlang:demonitor(MRef, [flush]), State#state{monitors = dict:erase(MRef, Monitors)}. setstats(State = #state{statsfun = StatsFun}) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index eac7743bc..78e53da70 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -311,5 +311,6 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> State#state{monitors = dict:store(MRef, ClientId, Monitors)}. erase_monitor(MRef, State = #state{monitors = Monitors}) -> + erlang:demonitor(MRef, [flush]), State#state{monitors = dict:erase(MRef, Monitors)}. diff --git a/src/emqx_ws_client.erl b/src/emqx_ws_client.erl index e1861684b..40e824238 100644 --- a/src/emqx_ws_client.erl +++ b/src/emqx_ws_client.erl @@ -91,23 +91,30 @@ clean_acl_cache(CPid, Topic) -> init([Env, WsPid, Req, ReplyChannel]) -> process_flag(trap_exit, true), - true = link(WsPid), - {ok, Peername} = Req:get(peername), - Headers = mochiweb_headers:to_list( - mochiweb_request:get(headers, Req)), Conn = Req:get(connection), - ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel), - [{ws_initial_headers, Headers} | Env]), - IdleTimeout = get_value(client_idle_timeout, Env, 30000), - EnableStats = get_value(client_enable_stats, Env, false), - ForceGcCount = emqx_gc:conn_max_gc_count(), - {ok, #wsclient_state{connection = Conn, - ws_pid = WsPid, - peername = Peername, - proto_state = ProtoState, - enable_stats = EnableStats, - force_gc_count = ForceGcCount}, - IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}. + true = link(WsPid), + case Req:get(peername) of + {ok, Peername} -> + Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)), + ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel), + [{ws_initial_headers, Headers} | Env]), + IdleTimeout = get_value(client_idle_timeout, Env, 30000), + EnableStats = get_value(client_enable_stats, Env, false), + ForceGcCount = emqx_gc:conn_max_gc_count(), + {ok, #wsclient_state{connection = Conn, + ws_pid = WsPid, + peername = Peername, + proto_state = ProtoState, + enable_stats = EnableStats, + force_gc_count = ForceGcCount}, + IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}; + {error, enotconn} -> Conn:fast_close(), + exit(WsPid, normal), + exit(normal); + {error, Reason} -> Conn:fast_close(), + exit(WsPid, normal), + exit({shutdown, Reason}) + end. prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end. @@ -207,6 +214,9 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State); +handle_info({shutdown, Reason}, State) -> + shutdown(Reason, State); + handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), case emqx_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of