merge master
This commit is contained in:
commit
ada53f8b68
10
CHANGELOG.md
10
CHANGELOG.md
|
@ -14,6 +14,16 @@ WebSocket
|
||||||
Presence Management....
|
Presence Management....
|
||||||
|
|
||||||
|
|
||||||
|
0.6.2-alpha (2015-04-24)
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
Bugfix: critical issue #54, #104, #106 - error when resume session
|
||||||
|
|
||||||
|
Improve: add emqttd_cm_sup module, and use 'hash' gproc_pool to register/unregister client ids
|
||||||
|
|
||||||
|
Improve: kick old client out when session is duplicated.
|
||||||
|
|
||||||
|
Improve: move mnesia dir config from etc/app.config to etc/vm.args
|
||||||
|
|
||||||
|
|
||||||
0.6.1-alpha (2015-04-20)
|
0.6.1-alpha (2015-04-20)
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{application, emqtt,
|
{application, emqtt,
|
||||||
[
|
[
|
||||||
{description, "Erlang Common MQTT Library"},
|
{description, "Erlang MQTT Common Library"},
|
||||||
{vsn, "0.6.1"},
|
{vsn, "0.7.0"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -77,7 +77,7 @@ start_servers(Sup) ->
|
||||||
{"emqttd event", emqttd_event},
|
{"emqttd event", emqttd_event},
|
||||||
{"emqttd trace", emqttd_trace},
|
{"emqttd trace", emqttd_trace},
|
||||||
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
||||||
{"emqttd client manager", emqttd_cm},
|
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
||||||
{"emqttd session manager", emqttd_sm},
|
{"emqttd session manager", emqttd_sm},
|
||||||
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
|
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
|
||||||
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
|
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/0]).
|
-export([start_link/2]).
|
||||||
|
|
||||||
-export([lookup/1, register/1, unregister/1]).
|
-export([lookup/1, register/1, unregister/1]).
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@
|
||||||
|
|
||||||
-record(state, {tab, statsfun}).
|
-record(state, {tab, statsfun}).
|
||||||
|
|
||||||
-define(CLIENT_TAB, mqtt_client).
|
-define(POOL, cm).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -53,9 +53,11 @@
|
||||||
%% @doc Start client manager
|
%% @doc Start client manager
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
-spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when
|
||||||
start_link() ->
|
Id :: pos_integer(),
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
TabId :: ets:tid().
|
||||||
|
start_link(Id, TabId) ->
|
||||||
|
gen_server:start_link(?MODULE, [Id, TabId], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Lookup client pid with clientId
|
%% @doc Lookup client pid with clientId
|
||||||
|
@ -63,7 +65,7 @@ start_link() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
||||||
lookup(ClientId) when is_binary(ClientId) ->
|
lookup(ClientId) when is_binary(ClientId) ->
|
||||||
case ets:lookup(?CLIENT_TAB, ClientId) of
|
case ets:lookup(emqttd_cm_sup:table(), ClientId) of
|
||||||
[{_, Pid, _}] -> Pid;
|
[{_, Pid, _}] -> Pid;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
@ -74,12 +76,8 @@ lookup(ClientId) when is_binary(ClientId) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec register(ClientId :: binary()) -> ok.
|
-spec register(ClientId :: binary()) -> ok.
|
||||||
register(ClientId) when is_binary(ClientId) ->
|
register(ClientId) when is_binary(ClientId) ->
|
||||||
Pid = self(),
|
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||||
%% this is atomic
|
gen_server:call(CmPid, {register, ClientId, self()}, infinity).
|
||||||
case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of
|
|
||||||
true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid});
|
|
||||||
false -> gen_server:cast(?SERVER, {register, ClientId, Pid})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Unregister clientId with pid.
|
%% @doc Unregister clientId with pid.
|
||||||
|
@ -87,45 +85,42 @@ register(ClientId) when is_binary(ClientId) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec unregister(ClientId :: binary()) -> ok.
|
-spec unregister(ClientId :: binary()) -> ok.
|
||||||
unregister(ClientId) when is_binary(ClientId) ->
|
unregister(ClientId) when is_binary(ClientId) ->
|
||||||
gen_server:cast(?SERVER, {unregister, ClientId, self()}).
|
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||||
|
gen_server:cast(CmPid, {unregister, ClientId, self()}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([Id, TabId]) ->
|
||||||
TabId = ets:new(?CLIENT_TAB, [set,
|
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
||||||
named_table,
|
|
||||||
public,
|
|
||||||
{write_concurrency, true}]),
|
|
||||||
StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'),
|
StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'),
|
||||||
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
||||||
|
|
||||||
|
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
||||||
|
case ets:lookup(Tab, ClientId) of
|
||||||
|
[{_, Pid, _}] ->
|
||||||
|
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
||||||
|
ignore;
|
||||||
|
[{_, OldPid, MRef}] ->
|
||||||
|
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
|
||||||
|
OldPid ! {stop, duplicate_id, Pid},
|
||||||
|
erlang:demonitor(MRef),
|
||||||
|
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)});
|
||||||
|
[] ->
|
||||||
|
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
|
||||||
|
end,
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:error("unexpected request: ~p", [Req]),
|
lager:error("unexpected request: ~p", [Req]),
|
||||||
{reply, {error, badreq}, State}.
|
{reply, {error, badreq}, State}.
|
||||||
|
|
||||||
handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) ->
|
handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) ->
|
||||||
case registerd(Tab, {ClientId, Pid}) of
|
case ets:lookup(TabId, ClientId) of
|
||||||
true ->
|
|
||||||
ignore;
|
|
||||||
false ->
|
|
||||||
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
|
|
||||||
end,
|
|
||||||
{noreply, setstats(State)};
|
|
||||||
|
|
||||||
handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
|
|
||||||
case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of
|
|
||||||
true -> ok;
|
|
||||||
false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid])
|
|
||||||
end,
|
|
||||||
{noreply, setstats(State)};
|
|
||||||
|
|
||||||
handle_cast({unregister, ClientId, Pid}, State) ->
|
|
||||||
case ets:lookup(?CLIENT_TAB, ClientId) of
|
|
||||||
[{_, Pid, MRef}] ->
|
[{_, Pid, MRef}] ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
ets:delete(?CLIENT_TAB, ClientId);
|
ets:delete(TabId, ClientId);
|
||||||
[_] ->
|
[_] ->
|
||||||
ignore;
|
ignore;
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -136,8 +131,8 @@ handle_cast({unregister, ClientId, Pid}, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) ->
|
||||||
ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
|
ets:match_delete(TabId, {'_', DownPid, MRef}),
|
||||||
{noreply, setstats(State)};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -152,21 +147,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
registerd(Tab, {ClientId, Pid}) ->
|
|
||||||
case ets:lookup(Tab, ClientId) of
|
|
||||||
[{_, Pid, _}] ->
|
|
||||||
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
|
||||||
true;
|
|
||||||
[{_, OldPid, MRef}] ->
|
|
||||||
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
|
|
||||||
OldPid ! {stop, duplicate_id, Pid},
|
|
||||||
erlang:demonitor(MRef),
|
|
||||||
false;
|
|
||||||
[] ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
setstats(State = #state{statsfun = StatsFun}) ->
|
setstats(State = #state{tab = TabId, statsfun = StatsFun}) ->
|
||||||
StatsFun(ets:info(?CLIENT_TAB, size)), State.
|
StatsFun(ets:info(TabId, size)), State.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
|
%%%
|
||||||
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
|
%%% furnished to do so, subject to the following conditions:
|
||||||
|
%%%
|
||||||
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
|
%%% copies or substantial portions of the Software.
|
||||||
|
%%%
|
||||||
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
%%% SOFTWARE.
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%% emqttd client manager supervisor.
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
-module(emqttd_cm_sup).
|
||||||
|
|
||||||
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0, table/0]).
|
||||||
|
|
||||||
|
%% Supervisor callbacks
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-define(CLIENT_TAB, mqtt_client).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
table() -> ?CLIENT_TAB.
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
|
||||||
|
{write_concurrency, true}]),
|
||||||
|
Schedulers = erlang:system_info(schedulers),
|
||||||
|
gproc_pool:new(cm, hash, [{size, Schedulers}]),
|
||||||
|
Children = lists:map(
|
||||||
|
fun(I) ->
|
||||||
|
Name = {emqttd_cm, I},
|
||||||
|
gproc_pool:add_worker(cm, Name, I),
|
||||||
|
{Name, {emqttd_cm, start_link, [I, TabId]},
|
||||||
|
permanent, 10000, worker, [emqttd_cm]}
|
||||||
|
end, lists:seq(1, Schedulers)),
|
||||||
|
{ok, {{one_for_all, 10, 100}, Children}}.
|
||||||
|
|
||||||
|
|
|
@ -243,6 +243,10 @@ initial_state(ClientId, ClientPid) ->
|
||||||
State = initial_state(ClientId),
|
State = initial_state(ClientId),
|
||||||
State#session_state{client_pid = ClientPid}.
|
State#session_state{client_pid = ClientPid}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Start a session process.
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
start_link(SessOpts, ClientId, ClientPid) ->
|
start_link(SessOpts, ClientId, ClientPid) ->
|
||||||
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
|
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
|
||||||
|
|
||||||
|
@ -270,18 +274,33 @@ handle_call({unsubscribe, Topics}, _From, State) ->
|
||||||
{reply, ok, NewState};
|
{reply, ok, NewState};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
{stop, {badreq, Req}, State}.
|
lager:error("Unexpected request: ~p", [Req]),
|
||||||
|
{reply, error, State}.
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, State = #session_state{
|
handle_cast({resume, ClientId, ClientPid}, State = #session_state{
|
||||||
clientid = ClientId,
|
clientid = ClientId,
|
||||||
client_pid = undefined,
|
client_pid = OldClientPid,
|
||||||
msg_queue = Queue,
|
msg_queue = Queue,
|
||||||
awaiting_ack = AwaitingAck,
|
awaiting_ack = AwaitingAck,
|
||||||
awaiting_comp = AwaitingComp,
|
awaiting_comp = AwaitingComp,
|
||||||
expire_timer = ETimer}) ->
|
expire_timer = ETimer}) ->
|
||||||
|
|
||||||
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
|
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
|
||||||
%cancel timeout timer
|
|
||||||
erlang:cancel_timer(ETimer),
|
%% kick old client...
|
||||||
|
if
|
||||||
|
OldClientPid =:= undefined ->
|
||||||
|
ok;
|
||||||
|
OldClientPid =:= ClientPid ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
lager:error("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]),
|
||||||
|
unlink(OldClientPid),
|
||||||
|
OldClientPid ! {stop, duplicate_id, ClientPid}
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% cancel timeout timer
|
||||||
|
emqttd_utils:cancel_timer(ETimer),
|
||||||
|
|
||||||
%% redelivery PUBREL
|
%% redelivery PUBREL
|
||||||
lists:foreach(fun(PacketId) ->
|
lists:foreach(fun(PacketId) ->
|
||||||
|
@ -328,7 +347,8 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) ->
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
{stop, {badmsg, Msg}, State}.
|
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
|
handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
|
||||||
F = fun(Message, S) -> dispatch(Message, S) end,
|
F = fun(Message, S) -> dispatch(Message, S) end,
|
||||||
|
@ -338,18 +358,21 @@ handle_info({dispatch, {_From, Message}}, State) ->
|
||||||
{noreply, dispatch(Message, State)};
|
{noreply, dispatch(Message, State)};
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
|
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid}) ->
|
||||||
expires = Expires}) ->
|
lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
|
||||||
lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
|
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
||||||
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
|
|
||||||
{noreply, State#session_state{client_pid = undefined, expire_timer = Timer}};
|
handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) ->
|
||||||
|
lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
|
handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
|
||||||
lager:warning("Session ~s expired!", [ClientId]),
|
lager:warning("Session ~s expired!", [ClientId]),
|
||||||
{stop, {shutdown, expired}, State};
|
{stop, {shutdown, expired}, State};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
{stop, {badinfo, Info}, State}.
|
lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -384,4 +407,9 @@ next_msg_id(State = #session_state{message_id = 16#ffff}) ->
|
||||||
next_msg_id(State = #session_state{message_id = MsgId}) ->
|
next_msg_id(State = #session_state{message_id = MsgId}) ->
|
||||||
State#session_state{message_id = MsgId + 1}.
|
State#session_state{message_id = MsgId + 1}.
|
||||||
|
|
||||||
|
start_expire_timer(State = #session_state{expires = Expires,
|
||||||
|
expire_timer = OldTimer}) ->
|
||||||
|
emqttd_utils:cancel_timer(OldTimer),
|
||||||
|
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
|
||||||
|
State#session_state{expire_timer = Timer}.
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ start_session(ClientId, ClientPid) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([SessOpts]) ->
|
init([SessOpts]) ->
|
||||||
{ok, {{simple_one_for_one, 0, 1},
|
{ok, {{simple_one_for_one, 10, 10},
|
||||||
[{session, {emqttd_session, start_link, [SessOpts]},
|
[{session, {emqttd_session, start_link, [SessOpts]},
|
||||||
transient, 10000, worker, [emqttd_session]}]}}.
|
transient, 10000, worker, [emqttd_session]}]}}.
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,8 @@
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-export([apply_module_attributes/1,
|
-export([apply_module_attributes/1,
|
||||||
all_module_attributes/1]).
|
all_module_attributes/1,
|
||||||
|
cancel_timer/1]).
|
||||||
|
|
||||||
%% only {F, Args}...
|
%% only {F, Args}...
|
||||||
apply_module_attributes(Name) ->
|
apply_module_attributes(Name) ->
|
||||||
|
@ -78,3 +79,8 @@ ignore_lib_apps(Apps) ->
|
||||||
hipe, esockd, mochiweb],
|
hipe, esockd, mochiweb],
|
||||||
[App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].
|
[App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].
|
||||||
|
|
||||||
|
|
||||||
|
cancel_timer(undefined) ->
|
||||||
|
undefined;
|
||||||
|
cancel_timer(Ref) ->
|
||||||
|
catch erlang:cancel_timer(Ref).
|
||||||
|
|
Binary file not shown.
|
@ -24,7 +24,7 @@
|
||||||
"plugins/emqttd_plugin_demo"]}.
|
"plugins/emqttd_plugin_demo"]}.
|
||||||
|
|
||||||
{deps, [
|
{deps, [
|
||||||
{gproc, "0.3.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
|
{gproc, "0.4.*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
|
||||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
|
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
|
||||||
{esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
|
{esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
|
||||||
{mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}}
|
{mochiweb, ".*", {git, "git://github.com/slimpp/mochiweb.git", {branch, "master"}}}
|
||||||
|
|
|
@ -7,9 +7,6 @@
|
||||||
{sasl, [
|
{sasl, [
|
||||||
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
|
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
|
||||||
]},
|
]},
|
||||||
{mnesia, [
|
|
||||||
{dir, "data/mnesia"}
|
|
||||||
]},
|
|
||||||
{ssl, [
|
{ssl, [
|
||||||
%{versions, ['tlsv1.2', 'tlsv1.1']}
|
%{versions, ['tlsv1.2', 'tlsv1.1']}
|
||||||
]},
|
]},
|
||||||
|
@ -21,7 +18,7 @@
|
||||||
{handlers, [
|
{handlers, [
|
||||||
{lager_console_backend, info},
|
{lager_console_backend, info},
|
||||||
{lager_file_backend, [
|
{lager_file_backend, [
|
||||||
{formatter_config, [time, " [",severity,"] ", message, "\n"]},
|
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
|
||||||
{file, "log/emqttd_info.log"},
|
{file, "log/emqttd_info.log"},
|
||||||
{level, info},
|
{level, info},
|
||||||
{size, 104857600},
|
{size, 104857600},
|
||||||
|
@ -29,7 +26,7 @@
|
||||||
{count, 30}
|
{count, 30}
|
||||||
]},
|
]},
|
||||||
{lager_file_backend, [
|
{lager_file_backend, [
|
||||||
{formatter_config, [time, " [",severity,"] ", message, "\n"]},
|
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
|
||||||
{file, "log/emqttd_error.log"},
|
{file, "log/emqttd_error.log"},
|
||||||
{level, error},
|
{level, error},
|
||||||
{size, 104857600},
|
{size, 104857600},
|
||||||
|
|
|
@ -1,9 +1,20 @@
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
## Name of the node
|
## Name of the node
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
-name emqttd@127.0.0.1
|
-name emqttd@127.0.0.1
|
||||||
|
|
||||||
## Cookie for distributed erlang
|
## Cookie for distributed erlang
|
||||||
-setcookie emqttdsecretcookie
|
-setcookie emqttdsecretcookie
|
||||||
|
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
## Mnesia dir. NOTICE: quote the dir with '" "'
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
-mnesia dir '"data/mnesia"'
|
||||||
|
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
## Flags
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
|
||||||
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
|
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
|
||||||
## (Disabled by default..use with caution!)
|
## (Disabled by default..use with caution!)
|
||||||
##-heart
|
##-heart
|
||||||
|
@ -16,6 +27,10 @@
|
||||||
## max process numbers
|
## max process numbers
|
||||||
+P 1000000
|
+P 1000000
|
||||||
|
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
## Env
|
||||||
|
##-------------------------------------------------------------------------
|
||||||
|
|
||||||
## Increase number of concurrent ports/sockets
|
## Increase number of concurrent ports/sockets
|
||||||
-env ERL_MAX_PORTS 4096
|
-env ERL_MAX_PORTS 4096
|
||||||
|
|
||||||
|
|
|
@ -1,19 +0,0 @@
|
||||||
#!/bin/sh
|
|
||||||
# -*- tab-width:4;indent-tabs-mode:nil -*-
|
|
||||||
# ex: ts=4 sw=4 et
|
|
||||||
|
|
||||||
# slimple publish
|
|
||||||
mosquitto_pub -t xxx/yyy -m hello
|
|
||||||
if [ "$?" == 0 ]; then
|
|
||||||
echo "[Success]: slimple publish"
|
|
||||||
else
|
|
||||||
echo "[Failure]: slimple publish"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# publish will willmsg
|
|
||||||
mosquitto_pub -q 1 -t a/b/c -m hahah -u test -P public --will-topic /will --will-payload willmsg --will-qos 1
|
|
||||||
if [ "$?" == 0 ]; then
|
|
||||||
echo "[Success]: publish with willmsg"
|
|
||||||
else
|
|
||||||
echo "[Failure]: publish with willmsg"
|
|
||||||
fi
|
|
Loading…
Reference in New Issue