diff --git a/README.md b/README.md index c2bbf5ebd..9bbd5f8eb 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ -# *EMQ* - Erlang MQTT Broker [![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) +# *EMQ* - Erlang MQTT Broker + +[![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd) +[](http://slack.emqtt.io) *EMQ* (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP. diff --git a/priv/emq.schema b/priv/emq.schema index 0f59cad69..9b20ea4c2 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -203,7 +203,7 @@ end}. ]}. {mapping, "log.syslog.identity", "lager.handlers", [ - {default, "emq"}, + {default, "emqttd"}, {datatype, string} ]}. @@ -213,7 +213,7 @@ end}. ]}. {mapping, "log.syslog.level", "lager.handlers", [ - {default, err}, + {default, error}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} ]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 0eef4e383..ecb210e53 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -7,6 +7,6 @@ {env, []}, {mod, {emqttd_app, []}}, {maintainers, ["Feng Lee "]}, - {licenses, ["MIT"]}, + {licenses, ["Apache-2.0"]}, {links, [{"Github", "https://github.com/emqtt/emqttd"}]} ]}. diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 68b00e501..d9795be37 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -225,6 +225,14 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> ?WSLOG(error, "shutdown: ~p",[Reason], State), shutdown(Reason, State); +%% The session process exited unexpectedly. +handle_info({'EXIT', Pid, Reason}, State = #wsclient_state{proto_state = ProtoState}) -> + case emqttd_protocol:session(ProtoState) of + Pid -> stop(Reason, State); + _ -> ?WSLOG(error, "Unexpected EXIT: ~p, Reason: ~p", [Pid, Reason], State), + {noreply, State, hibernate} + end; + handle_info(Info, State) -> ?WSLOG(error, "Unexpected Info: ~p", [Info], State), {noreply, State, hibernate}. diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index be60926fb..b5402830d 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -103,12 +103,15 @@ groups() -> cli_subscriptions, cli_bridges, cli_plugins, - cli_listeners, + {listeners, [sequence], + [cli_listeners, + conflict_listeners + ]}, cli_vm]}, {cleanSession, [sequence], [cleanSession_validate, - cleanSession_validate1, - cleanSession_validate2]}]. + cleanSession_validate1 + ]}]. init_per_suite(Config) -> application:start(lager), @@ -619,6 +622,35 @@ cli_bridges(_) -> cli_listeners(_) -> emqttd_cli:listeners([]). +conflict_listeners(_) -> + F = + fun() -> + process_flag(trap_exit, true), + emqttc:start_link([{host, "localhost"}, + {port, 1883}, + {client_id, <<"c1">>}, + {clean_sess, false}]) + end, + spawn_link(F), + + {ok, C2} = emqttc:start_link([{host, "localhost"}, + {port, 1883}, + {client_id, <<"c1">>}, + {clean_sess, false}]), + timer:sleep(100), + + Listeners = + lists:map(fun({{Protocol, ListenOn}, Pid}) -> + Key = atom_to_list(Protocol) ++ ":" ++ esockd:to_string(ListenOn), + {Key, [{acceptors, esockd:get_acceptors(Pid)}, + {max_clients, esockd:get_max_clients(Pid)}, + {current_clients, esockd:get_current_clients(Pid)}, + {shutdown_count, esockd:get_shutdown_count(Pid)}]} + end, esockd:listeners()), + ?assertEqual(1, proplists:get_value(current_clients, proplists:get_value("mqtt:tcp:1883", Listeners))), + ?assertEqual([{conflict,1}], proplists:get_value(shutdown_count, proplists:get_value("mqtt:tcp:1883", Listeners))), + emqttc:disconnect(C2). + cli_vm(_) -> emqttd_cli:vm([]), emqttd_cli:vm(["ports"]). @@ -630,7 +662,6 @@ cleanSession_validate(_) -> {clean_sess, false}]), timer:sleep(10), emqttc:subscribe(C1, <<"topic">>, qos0), - ok = emqttd_cli:sessions(["list", "persistent"]), emqttc:disconnect(C1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {port, 1883}, @@ -639,12 +670,11 @@ cleanSession_validate(_) -> emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]), timer:sleep(10), {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), + {port, 1883}, + {client_id, <<"c1">>}, + {clean_sess, false}]), timer:sleep(100), Metrics = emqttd_metrics:all(), - ct:log("Metrics:~p~n", [Metrics]), ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)), ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)), emqttc:disconnect(Pub), @@ -657,7 +687,6 @@ cleanSession_validate1(_) -> {clean_sess, true}]), timer:sleep(10), emqttc:subscribe(C1, <<"topic">>, qos1), - ok = emqttd_cli:sessions(["list", "transient"]), emqttc:disconnect(C1), {ok, Pub} = emqttc:start_link([{host, "localhost"}, {port, 1883},