Merge pull request #986 from emqtt/emq20

merge emq20 branch code
This commit is contained in:
turtleDeng 2017-04-10 12:40:26 +08:00 committed by GitHub
commit b839fdc275
5 changed files with 53 additions and 13 deletions

View File

@ -1,5 +1,8 @@
# *EMQ* - Erlang MQTT Broker [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd)
# *EMQ* - Erlang MQTT Broker
[![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd)
[<img src="http://slack.emqtt.io/badge.svg">](http://slack.emqtt.io)
*EMQ* (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP.

View File

@ -203,7 +203,7 @@ end}.
]}.
{mapping, "log.syslog.identity", "lager.handlers", [
{default, "emq"},
{default, "emqttd"},
{datatype, string}
]}.
@ -213,7 +213,7 @@ end}.
]}.
{mapping, "log.syslog.level", "lager.handlers", [
{default, err},
{default, error},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
]}.

View File

@ -7,6 +7,6 @@
{env, []},
{mod, {emqttd_app, []}},
{maintainers, ["Feng Lee <feng@emqtt.io>"]},
{licenses, ["MIT"]},
{licenses, ["Apache-2.0"]},
{links, [{"Github", "https://github.com/emqtt/emqttd"}]}
]}.

View File

@ -225,6 +225,14 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
?WSLOG(error, "shutdown: ~p",[Reason], State),
shutdown(Reason, State);
%% The session process exited unexpectedly.
handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) ->
case emqttd_protocol:session(ProtoState) of
Pid -> stop(Reason, State);
_ -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State),
{noreply, State, hibernate}
end;
handle_info(Info, State) ->
?WSLOG(error, "Unexpected Info: ~p", [Info], State),
{noreply, State, hibernate}.

View File

@ -103,12 +103,15 @@ groups() ->
cli_subscriptions,
cli_bridges,
cli_plugins,
cli_listeners,
{listeners, [sequence],
[cli_listeners,
conflict_listeners
]},
cli_vm]},
{cleanSession, [sequence],
[cleanSession_validate,
cleanSession_validate1,
cleanSession_validate2]}].
cleanSession_validate1
]}].
init_per_suite(Config) ->
application:start(lager),
@ -619,6 +622,35 @@ cli_bridges(_) ->
cli_listeners(_) ->
emqttd_cli:listeners([]).
conflict_listeners(_) ->
F =
fun() ->
process_flag(trap_exit, true),
emqttc:start_link([{host, "localhost"},
{port, 1883},
{client_id, <<"c1">>},
{clean_sess, false}])
end,
spawn_link(F),
{ok, C2} = emqttc:start_link([{host, "localhost"},
{port, 1883},
{client_id, <<"c1">>},
{clean_sess, false}]),
timer:sleep(100),
Listeners =
lists:map(fun({{Protocol, ListenOn}, Pid}) ->
Key = atom_to_list(Protocol) ++ ":" ++ esockd:to_string(ListenOn),
{Key, [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients, esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}]}
end, esockd:listeners()),
?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))),
?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))),
emqttc:disconnect(C2).
cli_vm(_) ->
emqttd_cli:vm([]),
emqttd_cli:vm(["ports"]).
@ -630,7 +662,6 @@ cleanSession_validate(_) ->
{clean_sess, false}]),
timer:sleep(10),
emqttc:subscribe(C1, <<"topic">>, qos0),
ok = emqttd_cli:sessions(["list", "persistent"]),
emqttc:disconnect(C1),
{ok, Pub} = emqttc:start_link([{host, "localhost"},
{port, 1883},
@ -644,7 +675,6 @@ cleanSession_validate(_) ->
{clean_sess, false}]),
timer:sleep(100),
Metrics = emqttd_metrics:all(),
ct:log("Metrics:~p~n", [Metrics]),
?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
emqttc:disconnect(Pub),
@ -657,7 +687,6 @@ cleanSession_validate1(_) ->
{clean_sess, true}]),
timer:sleep(10),
emqttc:subscribe(C1, <<"topic">>, qos1),
ok = emqttd_cli:sessions(["list", "transient"]),
emqttc:disconnect(C1),
{ok, Pub} = emqttc:start_link([{host, "localhost"},
{port, 1883},