Merge pull request #1664 from emqtt/develop
Support to start listener on specified port when emqttd is runnning
This commit is contained in:
commit
0adc8b39af
16
Makefile
16
Makefile
|
@ -1,21 +1,21 @@
|
||||||
PROJECT = emqttd
|
PROJECT = emqttd
|
||||||
PROJECT_DESCRIPTION = Erlang MQTT Broker
|
PROJECT_DESCRIPTION = Erlang MQTT Broker
|
||||||
PROJECT_VERSION = 2.3.10
|
PROJECT_VERSION = 2.3.11
|
||||||
|
|
||||||
DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
|
DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
|
||||||
|
|
||||||
dep_goldrush = git https://github.com/basho/goldrush 0.1.9
|
dep_goldrush = git https://github.com/basho/goldrush 0.1.9
|
||||||
dep_gproc = git https://github.com/uwiger/gproc
|
dep_gproc = git https://github.com/uwiger/gproc 0.8.0
|
||||||
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
|
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
|
||||||
dep_lager = git https://github.com/basho/lager master
|
dep_lager = git https://github.com/basho/lager 3.2.4
|
||||||
dep_esockd = git https://github.com/emqtt/esockd v5.2.2
|
dep_esockd = git https://github.com/emqtt/esockd v5.2.2
|
||||||
dep_ekka = git https://github.com/emqtt/ekka v0.2.3
|
dep_ekka = git https://github.com/emqtt/ekka v0.2.3
|
||||||
dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2
|
dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2
|
||||||
dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1
|
dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1
|
||||||
dep_lager_syslog = git https://github.com/basho/lager_syslog
|
dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1
|
||||||
dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master
|
dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt 0.5.0.3
|
||||||
dep_clique = git https://github.com/emqtt/clique
|
dep_clique = git https://github.com/emqtt/clique v0.3.10
|
||||||
dep_jsx = git https://github.com/talentdeficit/jsx
|
dep_jsx = git https://github.com/talentdeficit/jsx v2.8.3
|
||||||
|
|
||||||
ERLC_OPTS += +debug_info
|
ERLC_OPTS += +debug_info
|
||||||
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||||
|
@ -23,7 +23,7 @@ ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||||
NO_AUTOPATCH = cuttlefish
|
NO_AUTOPATCH = cuttlefish
|
||||||
|
|
||||||
BUILD_DEPS = cuttlefish
|
BUILD_DEPS = cuttlefish
|
||||||
dep_cuttlefish = git https://github.com/emqtt/cuttlefish
|
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
|
||||||
|
|
||||||
TEST_DEPS = emqttc emq_dashboard
|
TEST_DEPS = emqttc emq_dashboard
|
||||||
dep_emqttc = git https://github.com/emqtt/emqttc
|
dep_emqttc = git https://github.com/emqtt/emqttc
|
||||||
|
|
|
@ -439,7 +439,7 @@ end}.
|
||||||
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
|
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
|
||||||
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
|
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
|
||||||
|
|
||||||
ConsoleHandler = {lager_console_backend, [ConsoleLogLevel]},
|
ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
|
||||||
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
|
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
|
||||||
{level, ConsoleLogLevel},
|
{level, ConsoleLogLevel},
|
||||||
{size, cuttlefish:conf_get("log.console.size", Conf)},
|
{size, cuttlefish:conf_get("log.console.size", Conf)},
|
||||||
|
|
|
@ -203,6 +203,7 @@ merge_sockopts(Options) ->
|
||||||
%% @doc Stop Listeners
|
%% @doc Stop Listeners
|
||||||
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners, [])).
|
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners, [])).
|
||||||
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
stop_listener({tcp, ListenOn, _Opts}) ->
|
stop_listener({tcp, ListenOn, _Opts}) ->
|
||||||
esockd:close('mqtt:tcp', ListenOn);
|
esockd:close('mqtt:tcp', ListenOn);
|
||||||
|
@ -239,4 +240,3 @@ merge_sockopts_test_() ->
|
||||||
?_assert(merge_sockopts(Opts) == [{sockopts, ?MQTT_SOCKOPTS} | Opts]).
|
?_assert(merge_sockopts(Opts) == [{sockopts, ?MQTT_SOCKOPTS} | Opts]).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
|
|
@ -479,12 +479,16 @@ listeners([]) ->
|
||||||
end, Info)
|
end, Info)
|
||||||
end, esockd:listeners());
|
end, esockd:listeners());
|
||||||
|
|
||||||
|
listeners(["start", Proto, ListenOn]) ->
|
||||||
|
case emqttd_app:start_listener({list_to_atom(Proto), parse_listenon(ListenOn), []}) of
|
||||||
|
{ok, _Pid} ->
|
||||||
|
io:format("Start ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
||||||
|
{error, Error} ->
|
||||||
|
io:format("Failed to Start ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
|
||||||
|
end;
|
||||||
|
|
||||||
listeners(["restart", Proto, ListenOn]) ->
|
listeners(["restart", Proto, ListenOn]) ->
|
||||||
ListenOn1 = case string:tokens(ListenOn, ":") of
|
case emqttd_app:restart_listener({list_to_atom(Proto), parse_listenon(ListenOn), []}) of
|
||||||
[Port] -> list_to_integer(Port);
|
|
||||||
[IP, Port] -> {IP, list_to_integer(Port)}
|
|
||||||
end,
|
|
||||||
case emqttd_app:restart_listener({list_to_atom(Proto), ListenOn1, []}) of
|
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
@ -492,11 +496,7 @@ listeners(["restart", Proto, ListenOn]) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
listeners(["stop", Proto, ListenOn]) ->
|
listeners(["stop", Proto, ListenOn]) ->
|
||||||
ListenOn1 = case string:tokens(ListenOn, ":") of
|
case emqttd_app:stop_listener({list_to_atom(Proto), parse_listenon(ListenOn), []}) of
|
||||||
[Port] -> list_to_integer(Port);
|
|
||||||
[IP, Port] -> {IP, list_to_integer(Port)}
|
|
||||||
end,
|
|
||||||
case emqttd_app:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
|
|
||||||
ok ->
|
ok ->
|
||||||
io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
@ -605,3 +605,9 @@ format(_, Val) ->
|
||||||
Val.
|
Val.
|
||||||
|
|
||||||
bin(S) -> iolist_to_binary(S).
|
bin(S) -> iolist_to_binary(S).
|
||||||
|
|
||||||
|
parse_listenon(ListenOn) ->
|
||||||
|
case string:tokens(ListenOn, ":") of
|
||||||
|
[Port] -> list_to_integer(Port);
|
||||||
|
[IP, Port] -> {IP, list_to_integer(Port)}
|
||||||
|
end.
|
||||||
|
|
|
@ -386,6 +386,7 @@ handle_cast({subscribe, _From, TopicTable, AckFun},
|
||||||
SubMap1 =
|
SubMap1 =
|
||||||
case maps:find(Topic, SubMap) of
|
case maps:find(Topic, SubMap) of
|
||||||
{ok, NewQos} ->
|
{ok, NewQos} ->
|
||||||
|
emqttd_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}),
|
||||||
?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State),
|
?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State),
|
||||||
SubMap;
|
SubMap;
|
||||||
{ok, OldQos} ->
|
{ok, OldQos} ->
|
||||||
|
|
Loading…
Reference in New Issue