fix router
This commit is contained in:
parent
a0017c3186
commit
a34ae660dc
|
@ -26,10 +26,10 @@
|
|||
|
||||
-define(MQTT_SOCKOPTS, [
|
||||
binary,
|
||||
{packet, raw},
|
||||
{reuseaddr, true},
|
||||
{backlog, 512},
|
||||
{nodelay, false}
|
||||
{packet, raw},
|
||||
{reuseaddr, true},
|
||||
{backlog, 512},
|
||||
{nodelay, false}
|
||||
]).
|
||||
|
||||
listen(Listeners) when is_list(Listeners) ->
|
||||
|
@ -43,3 +43,4 @@ listen({http, Port, Options}) ->
|
|||
MFArgs = {emqtt_http, handle, []},
|
||||
mochiweb:start_http(Port, Options, MFArgs).
|
||||
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ check(undefined, _) -> false;
|
|||
|
||||
check(_, undefined) -> false;
|
||||
|
||||
check(Username, Password) when is_binary(Username) ->
|
||||
check(Username, Password) when is_binary(Username), is_binary(Password) ->
|
||||
PasswdHash = crypto:hash(md5, Password),
|
||||
case mnesia:dirty_read(emqtt_user, Username) of
|
||||
[#emqtt_user{passwdhash=PasswdHash}] -> true;
|
||||
|
@ -48,7 +48,12 @@ check(Username, Password) when is_binary(Username) ->
|
|||
end.
|
||||
|
||||
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
|
||||
mnesia:dirty_write(#emqtt_user{username=Username, passwdhash=crypto:hash(md5, Password)}).
|
||||
mnesia:dirty_write(
|
||||
#emqtt_user{
|
||||
username=Username,
|
||||
passwdhash=crypto:hash(md5, Password)
|
||||
}
|
||||
).
|
||||
|
||||
delete(Username) when is_binary(Username) ->
|
||||
mnesia:dirty_delete(emqtt_user, Username).
|
||||
|
|
|
@ -22,9 +22,13 @@
|
|||
|
||||
-module(emqtt_client).
|
||||
|
||||
-author('feng@slimchat.io').
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/1, info/1, go/2, stop/2]).
|
||||
-export([start_link/1,
|
||||
info/1,
|
||||
go/2]).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
|
@ -69,11 +73,8 @@ info(Pid) ->
|
|||
go(Pid, Sock) ->
|
||||
gen_server:call(Pid, {go, Sock}).
|
||||
|
||||
stop(Pid, Error) ->
|
||||
gen_server:cast(Pid, {stop, Error}).
|
||||
|
||||
init([Sock]) ->
|
||||
{ok, #state{socket = Sock}}.
|
||||
{ok, #state{socket = Sock}, 1000}.
|
||||
|
||||
handle_call({go, Sock}, _From, State=#state{socket = Sock}) ->
|
||||
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
||||
|
@ -101,14 +102,17 @@ handle_call(info, _From, #state{conn_name=ConnName,
|
|||
handle_call(_Req, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) ->
|
||||
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
||||
stop({shutdown, duplicate_id}, State);
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
|
||||
handle_info({route, Msg}, #state{socket = Sock, message_id=MsgId} = State) ->
|
||||
handle_info(timeout, State) ->
|
||||
stop({shutdown, timeout}, State);
|
||||
|
||||
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) ->
|
||||
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
||||
stop({shutdown, duplicate_id}, State);
|
||||
|
||||
handle_info({dispatch, Msg}, #state{socket = Sock, message_id=MsgId} = State) ->
|
||||
|
||||
#mqtt_msg{retain = Retain,
|
||||
qos = Qos,
|
||||
|
@ -155,7 +159,6 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|||
network_error(Reason, State);
|
||||
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
?ERROR("sock error: ~p~n", [Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
||||
|
@ -169,24 +172,17 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
|||
end;
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?ERROR("unext info :~p",[Info]),
|
||||
?ERROR("badinfo :~p",[Info]),
|
||||
{stop, {badinfo, Info}, State}.
|
||||
|
||||
terminate(_Reason, #state{client_id=ClientId, keep_alive=KeepAlive}) ->
|
||||
ok = emqtt_registry:unregister(ClientId),
|
||||
terminate(_Reason, #state{keep_alive=KeepAlive}) ->
|
||||
emqtt_keep_alive:cancel(KeepAlive),
|
||||
emqtt_cm:destroy(self()),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
throw_on_error(E, Thunk) ->
|
||||
case Thunk() of
|
||||
{error, Reason} -> throw({E, Reason});
|
||||
{ok, Res} -> Res;
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
async_recv(Sock, Length, infinity) when is_port(Sock) ->
|
||||
prim_inet:async_recv(Sock, Length, -1);
|
||||
|
||||
|
@ -259,7 +255,7 @@ process_request(?CONNECT,
|
|||
{?CONNACK_CREDENTIALS, State};
|
||||
true ->
|
||||
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
||||
ok = emqtt_registry:register(ClientId, self()),
|
||||
emqtt_cm:create(ClientId, self()),
|
||||
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
||||
{?CONNACK_ACCEPT,
|
||||
State #state{ will_msg = make_will_msg(Var),
|
||||
|
@ -362,7 +358,6 @@ process_request(?UNSUBSCRIBE,
|
|||
{ok, State};
|
||||
|
||||
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
|
||||
%?INFO("PINGREQ...",[]),
|
||||
%Keep alive timer
|
||||
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
||||
|
@ -398,7 +393,7 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
|||
send_will_msg(#state{will_msg = undefined}) ->
|
||||
ignore;
|
||||
send_will_msg(#state{will_msg = WillMsg }) ->
|
||||
emqtt_router:publish(WillMsg).
|
||||
emqtt_pubsub:publish(WillMsg).
|
||||
|
||||
send_frame(Sock, Frame) ->
|
||||
?INFO("send frame:~p", [Frame]),
|
||||
|
|
|
@ -66,13 +66,14 @@ lookup(ClientId) ->
|
|||
-spec create(ClientId :: binary(), Pid :: pid()) -> ok.
|
||||
create(ClientId, Pid) ->
|
||||
case lookup(ClientId) of
|
||||
Pid ->
|
||||
ignore;
|
||||
OldPid when is_pid(OldPid) ->
|
||||
%%TODO: FIX STOP...
|
||||
emqtt_client:stop(OldPid, duplicate_id);
|
||||
OldPid ! {stop, duplicate_id},
|
||||
ets:insert(emqtt_client, {ClientId, Pid});
|
||||
undefined ->
|
||||
ok
|
||||
end,
|
||||
ets:insert(emqtt_client, {ClientId, Pid}).
|
||||
ets:insert(emqtt_client, {ClientId, Pid})
|
||||
end.
|
||||
|
||||
-spec destroy(binary() | pid()) -> ok.
|
||||
destroy(ClientId) when is_binary(ClientId) ->
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
|
||||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% Developer of the eMQTT Code is <ery.lee@gmail.com>
|
||||
%% Copyright (c) 2012 Ery Lee. All rights reserved.
|
||||
%%
|
||||
-module(emqtt_lib).
|
||||
|
|
@ -24,6 +24,8 @@
|
|||
|
||||
-author('feng@slimchat.io').
|
||||
|
||||
%%TODO: FIXME Later...
|
||||
|
||||
%%
|
||||
%% <<MQTT_V3.1_Protocol_Specific>>
|
||||
|
||||
|
@ -76,11 +78,10 @@ delete(Topic) ->
|
|||
gen_server:cast(?MODULE, {delete, Topic}).
|
||||
|
||||
send(Topic, Client) ->
|
||||
[Client ! {route, Msg} ||{_, Msg} <- lookup(Topic)].
|
||||
[Client ! {dispatch, Msg} ||{_, Msg} <- lookup(Topic)].
|
||||
|
||||
init([]) ->
|
||||
ets:new(retained_msg, [set, protected, named_table]),
|
||||
?INFO("~p is started.", [?MODULE]),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
|
|
Loading…
Reference in New Issue