diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 94b005a75..ba4fc7291 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -59,7 +59,7 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). go(Pid, Sock) -> - gen_server2:call(Pid, {go, Sock}). + gen_server2:call(Pid, {go, Sock}, infinity). info(Pid) -> gen_server2:call(Pid, info). @@ -85,7 +85,7 @@ handle_call({go, Sock}, _From, _State) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), %FIXME: merge to registry emqtt_client_monitor:mon(self()), - ?INFO("accepting connection (~s)", [ConnStr]), + ?ERROR("accepting connection (~s)", [ConnStr]), {reply, ok, control_throttle( #state{ socket = Sock, @@ -163,6 +163,7 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> end; handle_info(Info, State) -> + ?ERROR("unext info :~p",[Info]), {stop, {badinfo, Info}, State}. terminate(_Reason, #state{client_id=ClientId, keep_alive=KeepAlive}) -> @@ -252,7 +253,7 @@ process_request(?CONNECT, ?ERROR_MSG("MQTT login failed - no credentials"), {?CONNACK_CREDENTIALS, State}; true -> - ?INFO("connect from clientid: ~s, ~p", [ClientId, AlivePeriod]), + ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), ok = emqtt_registry:register(ClientId, self()), KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, @@ -261,6 +262,7 @@ process_request(?CONNECT, keep_alive = KeepAlive}} end end, + ?INFO("recv conn...:~p", [ReturnCode]), send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, variable = #mqtt_frame_connack{ return_code = ReturnCode }}), @@ -394,12 +396,13 @@ send_will_msg(#state{will_msg = WillMsg }) -> emqtt_router:publish(WillMsg). send_frame(Sock, Frame) -> + ?INFO("send frame:~p", [Frame]), erlang:port_command(Sock, emqtt_frame:serialise(Frame)). %%---------------------------------------------------------------------------- network_error(Reason, State = #state{ conn_name = ConnStr}) -> - ?INFO("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), + ?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl index 38d324039..49db2e6c8 100644 --- a/src/emqtt_sup.erl +++ b/src/emqtt_sup.erl @@ -52,14 +52,21 @@ start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) -> %% =================================================================== init([Listeners]) -> + Listeners2 = lists:map(fun({Port, Args}) -> + {Port, Args}; + ({Port, Size, Args}) -> + [{Port+I, Args} || I <- lists:seq(0,Size)] + end, Listeners), + {ok, { {one_for_all, 5, 10}, [ + ?CHILD(emqtt_monitor, worker), ?CHILD(emqtt_auth, worker), ?CHILD(emqtt_retained, worker), ?CHILD(emqtt_router, worker), ?CHILD(emqtt_registry, worker), ?CHILD(emqtt_client_monitor, worker), ?CHILD(emqtt_client_sup, supervisor) - | listener_children(Listeners) ]} + | listener_children(lists:flatten(Listeners2)) ]} }. listener_children(Listeners) ->