rm emqtt_log.hrl
This commit is contained in:
parent
3e6b17146a
commit
a8dcb2bfe3
|
@ -1,85 +0,0 @@
|
||||||
%%-----------------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
|
|
||||||
%%
|
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%% in the Software without restriction, including without limitation the rights
|
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%% furnished to do so, subject to the following conditions:
|
|
||||||
%%
|
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%% copies or substantial portions of the Software.
|
|
||||||
%%
|
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%% SOFTWARE.
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Logging mechanism
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-define(PRINT(Format, Args),
|
|
||||||
io:format(Format, Args)).
|
|
||||||
|
|
||||||
-define(PRINT_MSG(Msg),
|
|
||||||
io:format(Msg)).
|
|
||||||
|
|
||||||
-define(DEBUG(Format, Args),
|
|
||||||
lager:debug(Format, Args)).
|
|
||||||
|
|
||||||
-define(DEBUG_TRACE(Dest, Format, Args),
|
|
||||||
lager:debug(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(DEBUG_MSG(Msg),
|
|
||||||
lager:debug(Msg)).
|
|
||||||
|
|
||||||
-define(INFO(Format, Args),
|
|
||||||
lager:info(Format, Args)).
|
|
||||||
|
|
||||||
-define(INFO_TRACE(Dest, Format, Args),
|
|
||||||
lager:info(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(INFO_MSG(Msg),
|
|
||||||
lager:info(Msg)).
|
|
||||||
|
|
||||||
-define(WARN(Format, Args),
|
|
||||||
lager:warning(Format, Args)).
|
|
||||||
|
|
||||||
-define(WARN_TRACE(Dest, Format, Args),
|
|
||||||
lager:warning(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(WARN_MSG(Msg),
|
|
||||||
lager:warning(Msg)).
|
|
||||||
|
|
||||||
-define(WARNING(Format, Args),
|
|
||||||
lager:warning(Format, Args)).
|
|
||||||
|
|
||||||
-define(WARNING_TRACE(Dest, Format, Args),
|
|
||||||
lager:warning(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(WARNING_MSG(Msg),
|
|
||||||
lager:warning(Msg)).
|
|
||||||
|
|
||||||
-define(ERROR(Format, Args),
|
|
||||||
lager:error(Format, Args)).
|
|
||||||
|
|
||||||
-define(ERROR_TRACE(Dest, Format, Args),
|
|
||||||
lager:error(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(ERROR_MSG(Msg),
|
|
||||||
lager:error(Msg)).
|
|
||||||
|
|
||||||
-define(CRITICAL(Format, Args),
|
|
||||||
lager:critical(Format, Args)).
|
|
||||||
|
|
||||||
-define(CRITICAL_TRACE(Dest, Format, Args),
|
|
||||||
lager:critical(Dest, Format, Args)).
|
|
||||||
|
|
||||||
-define(CRITICAL_MSG(Msg),
|
|
||||||
lager:critical(Msg)).
|
|
||||||
|
|
|
@ -24,13 +24,15 @@
|
||||||
|
|
||||||
-author('feng@slimchat.io').
|
-author('feng@slimchat.io').
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-behaviour(application).
|
-behaviour(application).
|
||||||
|
|
||||||
%% Application callbacks
|
%% Application callbacks
|
||||||
-export([start/2, stop/1]).
|
-export([start/2, stop/1]).
|
||||||
|
|
||||||
|
-define(PRINT_MSG(Msg), io:format(Msg)).
|
||||||
|
|
||||||
|
-define(PRINT(Format, Args), io:format(Format, Args)).
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
%% Application callbacks
|
%% Application callbacks
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-export([start_link/0,
|
-export([start_link/0,
|
||||||
add/2,
|
add/2,
|
||||||
check/1, check/2,
|
check/1, check/2,
|
||||||
|
@ -73,7 +71,6 @@ init([]) ->
|
||||||
ok = AuthMod:init(Opts),
|
ok = AuthMod:init(Opts),
|
||||||
ets:new(?TAB, [named_table, protected]),
|
ets:new(?TAB, [named_table, protected]),
|
||||||
ets:insert(?TAB, {mod, AuthMod}),
|
ets:insert(?TAB, {mod, AuthMod}),
|
||||||
?PRINT("emqtt authmod is ~p", [AuthMod]),
|
|
||||||
{ok, undefined}.
|
{ok, undefined}.
|
||||||
|
|
||||||
authmod(Name) when is_atom(Name) ->
|
authmod(Name) when is_atom(Name) ->
|
||||||
|
|
|
@ -37,8 +37,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
-include("emqtt_frame.hrl").
|
||||||
|
|
||||||
%%Client State...
|
%%Client State...
|
||||||
|
@ -91,7 +89,7 @@ handle_info(timeout, State) ->
|
||||||
|
|
||||||
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) ->
|
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) ->
|
||||||
%%TODO:
|
%%TODO:
|
||||||
%?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
%lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
||||||
stop({shutdown, duplicate_id}, State);
|
stop({shutdown, duplicate_id}, State);
|
||||||
|
|
||||||
%%TODO: ok??
|
%%TODO: ok??
|
||||||
|
@ -116,7 +114,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
||||||
case emqtt_keep_alive:state(KeepAlive) of
|
case emqtt_keep_alive:state(KeepAlive) of
|
||||||
idle ->
|
idle ->
|
||||||
?INFO("keep_alive timeout: ~p", [State#state.conn_name]),
|
lager:info("keep_alive timeout: ~p", [State#state.conn_name]),
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
active ->
|
active ->
|
||||||
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
||||||
|
@ -124,7 +122,7 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?ERROR("badinfo :~p",[Info]),
|
lager:error("badinfo :~p",[Info]),
|
||||||
{stop, {badinfo, Info}, State}.
|
{stop, {badinfo, Info}, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{proto_state = ProtoState}) ->
|
terminate(_Reason, #state{proto_state = ProtoState}) ->
|
||||||
|
@ -165,7 +163,7 @@ process_received_bytes(Bytes,
|
||||||
State#state{ parse_state = emqtt_frame:initial_state(),
|
State#state{ parse_state = emqtt_frame:initial_state(),
|
||||||
proto_state = ProtoState1 });
|
proto_state = ProtoState1 });
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?ERROR("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||||
stop({shutdown, Error}, State);
|
stop({shutdown, Error}, State);
|
||||||
{error, Error, ProtoState1} ->
|
{error, Error, ProtoState1} ->
|
||||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||||
|
@ -173,14 +171,14 @@ process_received_bytes(Bytes,
|
||||||
stop(normal, State#state{proto_state = ProtoState1})
|
stop(normal, State#state{proto_state = ProtoState1})
|
||||||
end;
|
end;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
|
lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
|
||||||
stop({shutdown, Error}, State)
|
stop({shutdown, Error}, State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
network_error(Reason,
|
network_error(Reason,
|
||||||
State = #state{ conn_name = ConnStr}) ->
|
State = #state{ conn_name = ConnStr}) ->
|
||||||
?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
|
lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
|
||||||
%%TODO: where to SEND WILL MSG??
|
%%TODO: where to SEND WILL MSG??
|
||||||
%%send_will_msg(State),
|
%%send_will_msg(State),
|
||||||
% todo: flush channel after publish
|
% todo: flush channel after publish
|
||||||
|
|
|
@ -25,8 +25,6 @@
|
||||||
|
|
||||||
-author('feng@slimchat.io').
|
-author('feng@slimchat.io').
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
@ -85,7 +83,7 @@ init(Args) ->
|
||||||
handle_call({create, ClientId, Pid}, _From, State) ->
|
handle_call({create, ClientId, Pid}, _From, State) ->
|
||||||
case ets:lookup(?TAB, ClientId) of
|
case ets:lookup(?TAB, ClientId) of
|
||||||
[{_, Pid, _}] ->
|
[{_, Pid, _}] ->
|
||||||
?ERROR("client '~s' has been registered with ~p", [ClientId, Pid]),
|
lager:error("client '~s' has been registered with ~p", [ClientId, Pid]),
|
||||||
ignore;
|
ignore;
|
||||||
[{_, OldPid, MRef}] ->
|
[{_, OldPid, MRef}] ->
|
||||||
OldPid ! {stop, duplicate_id},
|
OldPid ! {stop, duplicate_id},
|
||||||
|
@ -107,7 +105,7 @@ handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) ->
|
||||||
[_] ->
|
[_] ->
|
||||||
ignore;
|
ignore;
|
||||||
[] ->
|
[] ->
|
||||||
?ERROR("cannot find client '~s' with ~p", [ClientId, Pid])
|
lager:error("cannot find client '~s' with ~p", [ClientId, Pid])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,11 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
-define(PRINT_MSG(Msg),
|
||||||
|
io:format(Msg)).
|
||||||
|
|
||||||
|
-define(PRINT(Format, Args),
|
||||||
|
io:format(Format, Args)).
|
||||||
|
|
||||||
-export([status/1,
|
-export([status/1,
|
||||||
cluster/1,
|
cluster/1,
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-import(proplists, [get_value/2, get_value/3]).
|
-import(proplists, [get_value/2, get_value/3]).
|
||||||
|
|
||||||
-export([handle/1]).
|
-export([handle/1]).
|
||||||
|
@ -45,7 +43,7 @@ handle(Req) ->
|
||||||
|
|
||||||
handle('POST', "/mqtt/publish", Req) ->
|
handle('POST', "/mqtt/publish", Req) ->
|
||||||
Params = mochiweb_request:parse_post(Req),
|
Params = mochiweb_request:parse_post(Req),
|
||||||
?INFO("~p~n", [Params]),
|
lager:info("~p~n", [Params]),
|
||||||
Topic = list_to_binary(get_value("topic", Params)),
|
Topic = list_to_binary(get_value("topic", Params)),
|
||||||
Message = list_to_binary(get_value("message", Params)),
|
Message = list_to_binary(get_value("message", Params)),
|
||||||
emqtt_pubsub:publish(#mqtt_msg {
|
emqtt_pubsub:publish(#mqtt_msg {
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
|
|
||||||
-author('feng@slimchat.io').
|
-author('feng@slimchat.io').
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-behavior(gen_server).
|
-behavior(gen_server).
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
@ -50,7 +48,6 @@ start_link() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
init([]) ->
|
init([]) ->
|
||||||
erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]),
|
erlang:system_monitor(self(), [{long_gc, 5000}, {large_heap, 1000000}, busy_port]),
|
||||||
?INFO("monitor is started...[ok]", []),
|
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
|
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
|
||||||
|
@ -62,7 +59,7 @@ init([]) ->
|
||||||
%% Description: Handling call messages
|
%% Description: Handling call messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_call(Request, _From, State) ->
|
handle_call(Request, _From, State) ->
|
||||||
?ERROR("unexpected request: ~p", [Request]),
|
lager:error("unexpected request: ~p", [Request]),
|
||||||
{reply, {error, unexpected_request}, State}.
|
{reply, {error, unexpected_request}, State}.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: handle_cast(Msg, State) -> {noreply, State} |
|
%% Function: handle_cast(Msg, State) -> {noreply, State} |
|
||||||
|
@ -71,7 +68,7 @@ handle_call(Request, _From, State) ->
|
||||||
%% Description: Handling cast messages
|
%% Description: Handling cast messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?ERROR("unexpected msg: ~p", [Msg]),
|
lager:error("unexpected msg: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Function: handle_info(Info, State) -> {noreply, State} |
|
%% Function: handle_info(Info, State) -> {noreply, State} |
|
||||||
|
@ -80,22 +77,22 @@ handle_cast(Msg, State) ->
|
||||||
%% Description: Handling all non call/cast messages
|
%% Description: Handling all non call/cast messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
handle_info({monitor, GcPid, long_gc, Info}, State) ->
|
handle_info({monitor, GcPid, long_gc, Info}, State) ->
|
||||||
?ERROR("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid,
|
lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid,
|
||||||
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
|
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({monitor, GcPid, large_heap, Info}, State) ->
|
handle_info({monitor, GcPid, large_heap, Info}, State) ->
|
||||||
?ERROR("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid,
|
lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid,
|
||||||
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
|
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({monitor, SusPid, busy_port, Port}, State) ->
|
handle_info({monitor, SusPid, busy_port, Port}, State) ->
|
||||||
?ERROR("busy_port: suspid = ~p, port = ~p", [process_info(SusPid,
|
lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid,
|
||||||
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]),
|
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?ERROR("unexpected info: ~p", [Info]),
|
lager:error("unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
-include("emqtt_frame.hrl").
|
||||||
|
|
||||||
-record(proto_state, {
|
-record(proto_state, {
|
||||||
|
@ -75,7 +73,7 @@ info(#proto_state{ message_id = MsgId,
|
||||||
|
|
||||||
handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
||||||
State = #proto_state{client_id = ClientId}) ->
|
State = #proto_state{client_id = ClientId}) ->
|
||||||
?INFO("frame from ~s: ~p", [ClientId, Frame]),
|
lager:info("frame from ~s: ~p", [ClientId, Frame]),
|
||||||
case validate_frame(Type, Frame) of
|
case validate_frame(Type, Frame) of
|
||||||
ok ->
|
ok ->
|
||||||
handle_request(Type, Frame, State);
|
handle_request(Type, Frame, State);
|
||||||
|
@ -101,10 +99,10 @@ handle_request(?CONNECT,
|
||||||
_ ->
|
_ ->
|
||||||
case emqtt_auth:check(Username, Password) of
|
case emqtt_auth:check(Username, Password) of
|
||||||
false ->
|
false ->
|
||||||
?ERROR_MSG("MQTT login failed - no credentials"),
|
lager:error_MSG("MQTT login failed - no credentials"),
|
||||||
{?CONNACK_CREDENTIALS, State};
|
{?CONNACK_CREDENTIALS, State};
|
||||||
true ->
|
true ->
|
||||||
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
||||||
%%TODO:
|
%%TODO:
|
||||||
%%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
%%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
||||||
emqtt_cm:create(ClientId, self()),
|
emqtt_cm:create(ClientId, self()),
|
||||||
|
@ -113,7 +111,7 @@ handle_request(?CONNECT,
|
||||||
client_id = ClientId }}
|
client_id = ClientId }}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
?INFO("recv conn...:~p", [ReturnCode]),
|
lager:info("recv conn...:~p", [ReturnCode]),
|
||||||
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
|
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
|
||||||
variable = #mqtt_frame_connack{
|
variable = #mqtt_frame_connack{
|
||||||
return_code = ReturnCode }}),
|
return_code = ReturnCode }}),
|
||||||
|
@ -216,7 +214,7 @@ handle_request(?PINGREQ, #mqtt_frame{}, #proto_state{socket=Sock}=State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) ->
|
handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) ->
|
||||||
?INFO("~s disconnected", [ClientId]),
|
lager:info("~s disconnected", [ClientId]),
|
||||||
{stop, State}.
|
{stop, State}.
|
||||||
|
|
||||||
-spec send_message(Message, State) -> {ok, NewState} when
|
-spec send_message(Message, State) -> {ok, NewState} when
|
||||||
|
@ -259,7 +257,7 @@ send_message(Message, State = #proto_state{socket = Sock, message_id = MsgId}) -
|
||||||
end.
|
end.
|
||||||
|
|
||||||
send_frame(Sock, Frame) ->
|
send_frame(Sock, Frame) ->
|
||||||
?INFO("send frame:~p", [Frame]),
|
lager:info("send frame:~p", [Frame]),
|
||||||
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
||||||
|
|
||||||
%%TODO: fix me later...
|
%%TODO: fix me later...
|
||||||
|
@ -312,7 +310,7 @@ validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_
|
||||||
not emqtt_topic:validate({subscribe, Topic})],
|
not emqtt_topic:validate({subscribe, Topic})],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|
_ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
|
||||||
|
@ -320,7 +318,7 @@ validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_ta
|
||||||
not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
|
not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
|
||||||
case ErrTopics of
|
case ErrTopics of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
|
_ -> lager:error("error topics: ~p", [ErrTopics]), {error, badtopic}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
validate_frame(_Type, _Frame) ->
|
validate_frame(_Type, _Frame) ->
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-include("emqtt_topic.hrl").
|
-include("emqtt_topic.hrl").
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
@ -174,7 +172,7 @@ handle_cast(Msg, State) ->
|
||||||
handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
||||||
case get({submon, Mon}) of
|
case get({submon, Mon}) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?ERROR("unexpected 'DOWN': ~p", [Mon]);
|
lager:error("unexpected 'DOWN': ~p", [Mon]);
|
||||||
SubPid ->
|
SubPid ->
|
||||||
erase({submon, Mon}),
|
erase({submon, Mon}),
|
||||||
erase({subscriber, SubPid}),
|
erase({subscriber, SubPid}),
|
||||||
|
|
|
@ -46,8 +46,6 @@
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_log.hrl").
|
|
||||||
|
|
||||||
-export([start_link/0,
|
-export([start_link/0,
|
||||||
lookup/1,
|
lookup/1,
|
||||||
insert/2,
|
insert/2,
|
||||||
|
|
Loading…
Reference in New Issue