diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 9b6b265d4..2988dac78 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -106,7 +106,7 @@ load_hooks() -> ok = emqx_auth_http:register_metrics(), PoolOpts = proplists:get_value(pool_opts, AuthReq), PoolName = proplists:get_value(pool_name, AuthReq), - ehttpc_sup:start_pool(PoolName, PoolOpts), + {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts), case application:get_env(?APP, super_req) of undefined -> emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), @@ -114,7 +114,7 @@ load_hooks() -> {ok, SuperReq} -> PoolOpts1 = proplists:get_value(pool_opts, SuperReq), PoolName1 = proplists:get_value(pool_name, SuperReq), - ehttpc_sup:start_pool(PoolName1, PoolOpts1), + {ok, _} = ehttpc_sup:start_pool(PoolName1, PoolOpts1), emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), super => maps:from_list(SuperReq)}]}) end @@ -125,7 +125,7 @@ load_hooks() -> ok = emqx_acl_http:register_metrics(), PoolOpts2 = proplists:get_value(pool_opts, ACLReq), PoolName2 = proplists:get_value(pool_name, ACLReq), - ehttpc_sup:start_pool(PoolName2, PoolOpts2), + {ok, _} = ehttpc_sup:start_pool(PoolName2, PoolOpts2), emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]}) end, ok. @@ -133,9 +133,9 @@ load_hooks() -> unload_hooks() -> emqx:unhook('client.authenticate', {emqx_auth_http, check}), emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}), - ehttpc_sup:stop_pool('emqx_auth_http/auth_req'), - ehttpc_sup:stop_pool('emqx_auth_http/super_req'), - ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), + _ = ehttpc_sup:stop_pool('emqx_auth_http/auth_req'), + _ = ehttpc_sup:stop_pool('emqx_auth_http/super_req'), + _ = ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), ok. parse_host(Host) -> diff --git a/apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl b/apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl index 38087135f..efc2d2fbb 100644 --- a/apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl +++ b/apps/emqx_auth_mnesia/src/emqx_acl_mnesia_api.erl @@ -129,7 +129,7 @@ add(_Bindings, Params) -> case Re of #{result := ok} -> return({ok, Re}); #{result := <<"ok">>} -> return({ok, Re}); - _ -> return({error, Re}) + _ -> return({error, {add, Re}}) end end. diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia_cli.erl b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia_cli.erl index e7c9d982b..adefa704b 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia_cli.erl +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia_cli.erl @@ -59,12 +59,12 @@ insert_user(User = #emqx_user{login = Login}) -> %% @doc Update User -spec(update_user(tuple(), binary()) -> ok | {error, any()}). update_user(Login, NewPassword) -> - User = #emqx_user{login = Login, password = encrypted_data(NewPassword)}, - ret(mnesia:transaction(fun do_update_user/1, [User])). + ret(mnesia:transaction(fun do_update_user/2, [Login, encrypted_data(NewPassword)])). -do_update_user(User = #emqx_user{login = Login}) -> +do_update_user(Login, NewPassword) -> case mnesia:read(?TABLE, Login) of - [{?TABLE, Login, _, CreateAt}] -> mnesia:write(User#emqx_user{created_at = CreateAt}); + [#emqx_user{} = User] -> + mnesia:write(User#emqx_user{password = NewPassword}); [] -> mnesia:abort(noexisted) end. @@ -119,7 +119,7 @@ hash(Password, SaltBin, HashType) -> emqx_passwd:hash(HashType, <>). salt() -> - rand:seed(exsplus, erlang:timestamp()), + {_AlgHandler, _AlgState} = rand:seed(exsplus, erlang:timestamp()), Salt = rand:uniform(16#ffffffff), <>. %%-------------------------------------------------------------------- diff --git a/apps/emqx_coap/src/emqx_coap_server.erl b/apps/emqx_coap/src/emqx_coap_server.erl index 0d571fac3..4774bd310 100644 --- a/apps/emqx_coap/src/emqx_coap_server.erl +++ b/apps/emqx_coap/src/emqx_coap_server.erl @@ -55,7 +55,7 @@ start_listener({Proto, ListenOn, Opts}) -> io:format("Start coap:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start coap:~s listener on ~s - ~0p~n!", + io:format(standard_error, "Failed to start coap:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]), error(Reason) end. @@ -71,7 +71,7 @@ stop_listener({Proto, ListenOn, _Opts}) -> ok -> io:format("Stop coap:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop coap:~s listener on ~s - ~p~n.", + io:format(standard_error, "Failed to stop coap:~s listener on ~s: ~0p~n.", [Proto, format(ListenOn), Reason]) end, Ret. diff --git a/apps/emqx_exproto/src/emqx_exproto.erl b/apps/emqx_exproto/src/emqx_exproto.erl index 7d986ecdd..07f56ef1b 100644 --- a/apps/emqx_exproto/src/emqx_exproto.erl +++ b/apps/emqx_exproto/src/emqx_exproto.erl @@ -69,7 +69,7 @@ start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) -> {ok, _ClientChannelPid} -> {_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]}; {error, Reason} -> - io:format(standard_error, "Failed to start ~s's connection handler - ~0p~n!", + io:format(standard_error, "Failed to start ~s's connection handler: ~0p~n", [Name, Reason]), error(Reason) end. @@ -85,7 +85,7 @@ start_server({Name, Port, SSLOptions}) -> io:format("Start ~s gRPC server on ~w successfully.~n", [Name, Port]); {error, Reason} -> - io:format(standard_error, "Failed to start ~s gRPC server on ~w - ~0p~n!", + io:format(standard_error, "Failed to start ~s gRPC server on ~w: ~0p~n", [Name, Port, Reason]), error({failed_start_server, Reason}) end. @@ -101,7 +101,7 @@ start_listener({Proto, LisType, ListenOn, Opts}) -> io:format("Start ~s listener on ~s successfully.~n", [Name, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!", + io:format(standard_error, "Failed to start ~s listener on ~s: ~0p~n", [Name, format(ListenOn), Reason]), error(Reason) end. @@ -132,7 +132,7 @@ stop_listener({Proto, LisType, ListenOn, Opts}) -> io:format("Stop ~s listener on ~s successfully.~n", [Name, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop ~s listener on ~s - ~p~n.", + io:format(standard_error, "Failed to stop ~s listener on ~s: ~0p~n", [Name, format(ListenOn), Reason]) end, StopRet. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl index d366e44e1..47ea6a2ba 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl @@ -47,7 +47,7 @@ start_listener({Proto, ListenOn, Opts}) -> io:format("Start lwm2m:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start lwm2m:~s listener on ~s - ~0p~n!", + io:format(standard_error, "Failed to start lwm2m:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]), error(Reason) end. @@ -63,7 +63,7 @@ stop_listener({Proto, ListenOn, _Opts}) -> ok -> io:format("Stop lwm2m:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s - ~p~n.", + io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]) end, Ret. diff --git a/lib-opensource/emqx_rule_engine/.gitignore b/apps/emqx_rule_engine/.gitignore similarity index 100% rename from lib-opensource/emqx_rule_engine/.gitignore rename to apps/emqx_rule_engine/.gitignore diff --git a/lib-opensource/emqx_rule_engine/README.md b/apps/emqx_rule_engine/README.md similarity index 100% rename from lib-opensource/emqx_rule_engine/README.md rename to apps/emqx_rule_engine/README.md diff --git a/lib-opensource/emqx_rule_engine/docs/api_examples.md b/apps/emqx_rule_engine/docs/api_examples.md similarity index 100% rename from lib-opensource/emqx_rule_engine/docs/api_examples.md rename to apps/emqx_rule_engine/docs/api_examples.md diff --git a/lib-opensource/emqx_rule_engine/docs/cli_examples.md b/apps/emqx_rule_engine/docs/cli_examples.md similarity index 100% rename from lib-opensource/emqx_rule_engine/docs/cli_examples.md rename to apps/emqx_rule_engine/docs/cli_examples.md diff --git a/lib-opensource/emqx_rule_engine/docs/design.md b/apps/emqx_rule_engine/docs/design.md similarity index 100% rename from lib-opensource/emqx_rule_engine/docs/design.md rename to apps/emqx_rule_engine/docs/design.md diff --git a/lib-opensource/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf similarity index 100% rename from lib-opensource/emqx_rule_engine/etc/emqx_rule_engine.conf rename to apps/emqx_rule_engine/etc/emqx_rule_engine.conf diff --git a/lib-opensource/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl similarity index 100% rename from lib-opensource/emqx_rule_engine/include/rule_actions.hrl rename to apps/emqx_rule_engine/include/rule_actions.hrl diff --git a/lib-opensource/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl similarity index 100% rename from lib-opensource/emqx_rule_engine/include/rule_engine.hrl rename to apps/emqx_rule_engine/include/rule_engine.hrl diff --git a/lib-opensource/emqx_rule_engine/priv/emqx_rule_engine.schema b/apps/emqx_rule_engine/priv/emqx_rule_engine.schema similarity index 100% rename from lib-opensource/emqx_rule_engine/priv/emqx_rule_engine.schema rename to apps/emqx_rule_engine/priv/emqx_rule_engine.schema diff --git a/lib-opensource/emqx_rule_engine/rebar.config b/apps/emqx_rule_engine/rebar.config similarity index 100% rename from lib-opensource/emqx_rule_engine/rebar.config rename to apps/emqx_rule_engine/rebar.config diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_actions.erl rename to apps/emqx_rule_engine/src/emqx_rule_actions.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_actions_trans.erl b/apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_actions_trans.erl rename to apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine.app.src rename to apps/emqx_rule_engine/src/emqx_rule_engine.app.src diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine.erl rename to apps/emqx_rule_engine/src/emqx_rule_engine.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine_api.erl rename to apps/emqx_rule_engine/src/emqx_rule_engine_api.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine_app.erl rename to apps/emqx_rule_engine/src/emqx_rule_engine_app.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine_cli.erl rename to apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_engine_sup.erl rename to apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_events.erl rename to apps/emqx_rule_engine/src/emqx_rule_events.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_funcs.erl rename to apps/emqx_rule_engine/src/emqx_rule_funcs.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_id.erl b/apps/emqx_rule_engine/src/emqx_rule_id.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_id.erl rename to apps/emqx_rule_engine/src/emqx_rule_id.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_locker.erl b/apps/emqx_rule_engine/src/emqx_rule_locker.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_locker.erl rename to apps/emqx_rule_engine/src/emqx_rule_locker.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_maps.erl rename to apps/emqx_rule_engine/src/emqx_rule_maps.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_metrics.erl rename to apps/emqx_rule_engine/src/emqx_rule_metrics.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_registry.erl rename to apps/emqx_rule_engine/src/emqx_rule_registry.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_runtime.erl rename to apps/emqx_rule_engine/src/emqx_rule_runtime.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_sqlparser.erl rename to apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_sqltester.erl rename to apps/emqx_rule_engine/src/emqx_rule_sqltester.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_utils.erl rename to apps/emqx_rule_engine/src/emqx_rule_utils.erl diff --git a/lib-opensource/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/src/emqx_rule_validator.erl rename to apps/emqx_rule_engine/src/emqx_rule_validator.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_events_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_id_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_id_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_id_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_id_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl rename to apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl diff --git a/lib-opensource/emqx_rule_engine/test/prop_rule_maps.erl b/apps/emqx_rule_engine/test/prop_rule_maps.erl similarity index 100% rename from lib-opensource/emqx_rule_engine/test/prop_rule_maps.erl rename to apps/emqx_rule_engine/test/prop_rule_maps.erl diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 1f59edc9d..8a2f15865 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -71,7 +71,7 @@ start_listener({Proto, ListenOn, Options}) -> {ok, _} -> io:format("Start mqttsn:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqttsn:~s listener on ~s - ~0p~n!", + io:format(standard_error, "Failed to start mqttsn:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]), error(Reason) end. @@ -101,7 +101,7 @@ stop_listener({Proto, ListenOn, Opts}) -> ok -> io:format("Stop mqttsn:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s - ~p~n.", + io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]) end, StopRet. diff --git a/apps/emqx_sn/vars b/apps/emqx_sn/vars index bd0ac53e9..a170916f3 100644 --- a/apps/emqx_sn/vars +++ b/apps/emqx_sn/vars @@ -5,4 +5,4 @@ {platform_etc_dir, "etc"}. {platform_lib_dir, "lib"}. {platform_log_dir, "log"}. -{platform_plugins_dir, "plugins"}. \ No newline at end of file +{platform_plugins_dir, "plugins"}. diff --git a/apps/emqx_stomp/src/emqx_stomp.erl b/apps/emqx_stomp/src/emqx_stomp.erl index d7a6db9ee..3d6396d43 100644 --- a/apps/emqx_stomp/src/emqx_stomp.erl +++ b/apps/emqx_stomp/src/emqx_stomp.erl @@ -73,7 +73,7 @@ start_listener({Proto, ListenOn, Options}) -> {ok, _} -> io:format("Start stomp:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start stomp:~s listener on ~s - ~0p~n!", + io:format(standard_error, "Failed to start stomp:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]), error(Reason) end. @@ -102,7 +102,7 @@ stop_listener({Proto, ListenOn, Opts}) -> ok -> io:format("Stop stomp:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop stomp:~s listener on ~s - ~p~n.", + io:format(standard_error, "Failed to stop stomp:~s listener on ~s: ~0p~n", [Proto, format(ListenOn), Reason]) end, StopRet. diff --git a/bin/nodetool b/bin/nodetool index cf50509f9..1dfc93014 100755 --- a/bin/nodetool +++ b/bin/nodetool @@ -47,7 +47,7 @@ main(Args) -> {true, pong} -> ok; {false, pong} -> - io:format(standard_error, "Failed to connect to node ~p .\n", [TargetNode]), + io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]), halt(1); {_, pang} -> io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]), diff --git a/elvis.config b/elvis.config index bb551e81a..bed4ba2e8 100644 --- a/elvis.config +++ b/elvis.config @@ -5,7 +5,7 @@ [ {config, [ - #{dirs => ["src", "apps/**/src", "lib-opensource/**/src"], + #{dirs => ["src", "apps/**/src", "lib-ce/**/src"], filter => "*.erl", ruleset => erl_files, rules => [ @@ -16,7 +16,7 @@ ]}} ] }, - #{dirs => ["test", "apps/**/test", "lib-opensource/**/src"], + #{dirs => ["test", "apps/**/test", "lib-ce/**/src"], filter => "*.erl", rules => [ {elvis_text_style, line_length, #{ limit => 100 diff --git a/get-dashboard.sh b/get-dashboard.sh index 885a68f77..991f2b39d 100755 --- a/get-dashboard.sh +++ b/get-dashboard.sh @@ -10,9 +10,9 @@ cd -P -- "$(dirname -- "$0")" DOWNLOAD_URL='https://github.com/emqx/emqx-dashboard-frontend/releases/download' if [ "$EMQX_ENTERPRISE" = 'true' ] || [ "$EMQX_ENTERPRISE" == '1' ]; then - DASHBOARD_PATH='lib-enterprise/emqx_dashboard/priv' + DASHBOARD_PATH='lib-ee/emqx_dashboard/priv' else - DASHBOARD_PATH='lib-opensource/emqx_dashboard/priv' + DASHBOARD_PATH='lib-ce/emqx_dashboard/priv' fi case $(uname) in diff --git a/lib-opensource/emqx_dashboard/.gitignore b/lib-ce/emqx_dashboard/.gitignore similarity index 100% rename from lib-opensource/emqx_dashboard/.gitignore rename to lib-ce/emqx_dashboard/.gitignore diff --git a/lib-opensource/emqx_dashboard/README.md b/lib-ce/emqx_dashboard/README.md similarity index 100% rename from lib-opensource/emqx_dashboard/README.md rename to lib-ce/emqx_dashboard/README.md diff --git a/lib-opensource/emqx_dashboard/etc/emqx_dashboard.conf b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf similarity index 100% rename from lib-opensource/emqx_dashboard/etc/emqx_dashboard.conf rename to lib-ce/emqx_dashboard/etc/emqx_dashboard.conf diff --git a/lib-opensource/emqx_dashboard/include/emqx_dashboard.hrl b/lib-ce/emqx_dashboard/include/emqx_dashboard.hrl similarity index 100% rename from lib-opensource/emqx_dashboard/include/emqx_dashboard.hrl rename to lib-ce/emqx_dashboard/include/emqx_dashboard.hrl diff --git a/lib-opensource/emqx_dashboard/priv/emqx_dashboard.schema b/lib-ce/emqx_dashboard/priv/emqx_dashboard.schema similarity index 100% rename from lib-opensource/emqx_dashboard/priv/emqx_dashboard.schema rename to lib-ce/emqx_dashboard/priv/emqx_dashboard.schema diff --git a/lib-opensource/emqx_dashboard/rebar.config b/lib-ce/emqx_dashboard/rebar.config similarity index 100% rename from lib-opensource/emqx_dashboard/rebar.config rename to lib-ce/emqx_dashboard/rebar.config diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard.app.src rename to lib-ce/emqx_dashboard/src/emqx_dashboard.app.src diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard.erl diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard_admin.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard_api.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard_api.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard_app.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_app.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard_app.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard_app.erl diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard_cli.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_cli.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard_cli.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard_cli.erl diff --git a/lib-opensource/emqx_dashboard/src/emqx_dashboard_sup.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_sup.erl similarity index 100% rename from lib-opensource/emqx_dashboard/src/emqx_dashboard_sup.erl rename to lib-ce/emqx_dashboard/src/emqx_dashboard_sup.erl diff --git a/lib-opensource/emqx_dashboard/test/.placeholder b/lib-ce/emqx_dashboard/test/.placeholder similarity index 100% rename from lib-opensource/emqx_dashboard/test/.placeholder rename to lib-ce/emqx_dashboard/test/.placeholder diff --git a/lib-opensource/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl similarity index 100% rename from lib-opensource/emqx_dashboard/test/emqx_dashboard_SUITE.erl rename to lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl diff --git a/lib-opensource/emqx_management/.gitignore b/lib-ce/emqx_management/.gitignore similarity index 100% rename from lib-opensource/emqx_management/.gitignore rename to lib-ce/emqx_management/.gitignore diff --git a/lib-opensource/emqx_management/README.md b/lib-ce/emqx_management/README.md similarity index 100% rename from lib-opensource/emqx_management/README.md rename to lib-ce/emqx_management/README.md diff --git a/lib-opensource/emqx_management/etc/emqx_management.conf b/lib-ce/emqx_management/etc/emqx_management.conf similarity index 100% rename from lib-opensource/emqx_management/etc/emqx_management.conf rename to lib-ce/emqx_management/etc/emqx_management.conf diff --git a/lib-opensource/emqx_management/include/emqx_mgmt.hrl b/lib-ce/emqx_management/include/emqx_mgmt.hrl similarity index 100% rename from lib-opensource/emqx_management/include/emqx_mgmt.hrl rename to lib-ce/emqx_management/include/emqx_mgmt.hrl diff --git a/lib-opensource/emqx_management/priv/emqx_management.schema b/lib-ce/emqx_management/priv/emqx_management.schema similarity index 100% rename from lib-opensource/emqx_management/priv/emqx_management.schema rename to lib-ce/emqx_management/priv/emqx_management.schema diff --git a/lib-opensource/emqx_management/rebar.config b/lib-ce/emqx_management/rebar.config similarity index 100% rename from lib-opensource/emqx_management/rebar.config rename to lib-ce/emqx_management/rebar.config diff --git a/lib-opensource/emqx_management/src/emqx_management.app.src b/lib-ce/emqx_management/src/emqx_management.app.src similarity index 100% rename from lib-opensource/emqx_management/src/emqx_management.app.src rename to lib-ce/emqx_management/src/emqx_management.app.src diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-ce/emqx_management/src/emqx_mgmt.erl similarity index 99% rename from lib-opensource/emqx_management/src/emqx_mgmt.erl rename to lib-ce/emqx_management/src/emqx_mgmt.erl index 26b55b249..19474e5a1 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt.erl @@ -102,6 +102,7 @@ %% Listeners -export([ list_listeners/0 , list_listeners/1 + , restart_listener/2 ]). %% Alarms @@ -541,6 +542,7 @@ list_listeners(Node) when Node =:= node() -> Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> #{protocol => Protocol, listen_on => ListenOn, + identifier => emqx_listeners:find_id_by_listen_on(ListenOn), acceptors => esockd:get_acceptors({Protocol, ListenOn}), max_conns => esockd:get_max_connections({Protocol, ListenOn}), current_conns => esockd:get_current_connections({Protocol, ListenOn}), @@ -559,6 +561,12 @@ list_listeners(Node) when Node =:= node() -> list_listeners(Node) -> rpc_call(Node, list_listeners, [Node]). +restart_listener(Node, Identifier) when Node =:= node() -> + emqx_listeners:restart_listener(Identifier); + +restart_listener(Node, Identifier) -> + rpc_call(Node, restart_listener, [Node, Identifier]). + %%-------------------------------------------------------------------- %% Get Alarms %%-------------------------------------------------------------------- @@ -977,3 +985,4 @@ action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args} {name, Name}, {fallbacks, actions_to_prop_list(FallbackActions)}, {args, Args}]. + diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api.erl b/lib-ce/emqx_management/src/emqx_mgmt_api.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_alarms.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_alarms.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_alarms.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_alarms.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_apps.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_apps.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_apps.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_apps.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_banned.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_banned.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_banned.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_brokers.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_brokers.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_brokers.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_brokers.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_clients.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_clients.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_data.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_data.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_data.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_data.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_listeners.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl similarity index 61% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_listeners.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl index 153d20d38..7acbe8107 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -30,7 +30,19 @@ func => list, descr => "A list of listeners on the node"}). --export([list/2]). +-rest_api(#{name => restart_listener, + method => 'PUT', + path => "/listeners/:bin:identifier/restart", + func => restart, + descr => "Restart a listener in the cluster"}). + +-rest_api(#{name => restart_node_listener, + method => 'PUT', + path => "/nodes/:atom:node/listeners/:bin:identifier/restart", + func => restart, + descr => "Restart a listener on a node"}). + +-export([list/2, restart/2]). %% List listeners on a node. list(#{node := Node}, _Params) -> @@ -41,6 +53,21 @@ list(_Binding, _Params) -> return({ok, [#{node => Node, listeners => format(Listeners)} || {Node, Listeners} <- emqx_mgmt:list_listeners()]}). +%% Restart listeners on a node. +restart(#{node := Node, identifier := Identifier}, _Params) -> + case emqx_mgmt:restart_listener(Node, Identifier) of + ok -> return({ok, "Listener restarted."}); + {error, Error} -> return({error, Error}) + end; + +%% Restart listeners in the cluster. +restart(#{identifier := Identifier}, _Params) -> + Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], + case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of + [] -> return(ok); + Errors -> return({error, Errors}) + end. + format(Listeners) when is_list(Listeners) -> [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))} || Info = #{listen_on := ListenOn} <- Listeners ]; diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_metrics.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_metrics.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_metrics.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_metrics.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_modules.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_modules.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_nodes.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_nodes.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_nodes.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_nodes.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_plugins.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_plugins.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_plugins.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_plugins.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_pubsub.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_pubsub.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_routes.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_routes.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_routes.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_routes.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_stats.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_stats.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_stats.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_stats.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_subscriptions.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_subscriptions.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_subscriptions.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_topic_metrics.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_api_topic_metrics.erl rename to lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_app.erl b/lib-ce/emqx_management/src/emqx_mgmt_app.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_app.erl rename to lib-ce/emqx_management/src/emqx_mgmt_app.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_auth.erl b/lib-ce/emqx_management/src/emqx_mgmt_auth.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_auth.erl rename to lib-ce/emqx_management/src/emqx_mgmt_auth.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl b/lib-ce/emqx_management/src/emqx_mgmt_cli.erl similarity index 90% rename from lib-opensource/emqx_management/src/emqx_mgmt_cli.erl rename to lib-ce/emqx_management/src/emqx_mgmt_cli.erl index 8a9f57f67..8f3a09026 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_cli.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_cli.erl @@ -157,7 +157,7 @@ cluster(["join", SNode]) -> ignore -> emqx_ctl:print("Ignore.~n"); {error, Error} -> - emqx_ctl:print("Failed to join the cluster: ~p~n", [Error]) + emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> @@ -166,7 +166,7 @@ cluster(["leave"]) -> emqx_ctl:print("Leave the cluster successfully.~n"), cluster(["status"]); {error, Error} -> - emqx_ctl:print("Failed to leave the cluster: ~p~n", [Error]) + emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) end; cluster(["force-leave", SNode]) -> @@ -177,7 +177,7 @@ cluster(["force-leave", SNode]) -> ignore -> emqx_ctl:print("Ignore.~n"); {error, Error} -> - emqx_ctl:print("Failed to remove the node from cluster: ~p~n", [Error]) + emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error]) end; cluster(["status"]) -> @@ -510,49 +510,73 @@ trace_off(Who, Name) -> listeners([]) -> foreach(fun({{Protocol, ListenOn}, _Pid}) -> - Info = [{acceptors, esockd:get_acceptors({Protocol, ListenOn})}, + Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}}, + {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})}, - {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}], - emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} + ], + emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]), + foreach(fun indent_print/1, Info) end, esockd:listeners()), foreach(fun({Protocol, Opts}) -> - Info = [{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, + Port = proplists:get_value(port, Opts), + Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, + {acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, {max_conns, proplists:get_value(max_connections, Opts)}, {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], - emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]), - foreach(fun({Key, Val}) -> - emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) - end, Info) + emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), + foreach(fun indent_print/1, Info) end, ranch:info()); -listeners(["stop", Name = "http" ++ _N, ListenOn]) -> +listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> + %% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number case minirest:stop_http(list_to_atom(Name)) of ok -> - emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Name, ListenOn]); + emqx_ctl:print("Stop ~s listener successfully.~n", [Name]); {error, Error} -> - emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error]) + emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error]) end; -listeners(["stop", Proto, ListenOn]) -> +listeners(["stop", "mqtt:" ++ _ = Identifier]) -> + stop_listener(emqx_listeners:find_by_id(Identifier), Identifier); + +listeners(["stop", _Proto, ListenOn]) -> + %% this clause is kept to be backward compatible ListenOn1 = case string:tokens(ListenOn, ":") of [Port] -> list_to_integer(Port); [IP, Port] -> {IP, list_to_integer(Port)} end, - case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of + stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1); + +listeners(["restart", Identifier]) -> + case emqx_listeners:restart_listener(Identifier) of ok -> - emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); + emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]); {error, Error} -> - emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) + emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error]) end; listeners(_) -> emqx_ctl:usage([{"listeners", "List listeners"}, - {"listeners stop ", "Stop a listener"}]). + {"listeners stop ", "Stop a listener"}, + {"listeners stop ", "Stop a listener"}, + {"listeners restart ", "Restart a listener"} + ]). + +stop_listener(false, Input) -> + emqx_ctl:print("No such listener ~p~n", [Input]); +stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> + ID = emqx_listeners:identifier(Listener), + ListenOnStr = emqx_listeners:format_listen_on(ListenOn), + case emqx_listeners:stop_listener(Listener) of + ok -> + emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]); + {error, Reason} -> + emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n", + [ID, ListenOnStr, Reason]) + end. %%-------------------------------------------------------------------- %% @doc data Command @@ -707,3 +731,16 @@ format(_, Val) -> Val. bin(S) -> iolist_to_binary(S). + +indent_print({Key, {string, Val}}) -> + emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]); +indent_print({Key, Val}) -> + emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]). + +listener_identifier(Protocol, ListenOn) -> + case emqx_listeners:find_id_by_listen_on(ListenOn) of + false -> + "http" ++ _ = atom_to_list(Protocol); %% assert + ID -> + ID + end. diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_http.erl b/lib-ce/emqx_management/src/emqx_mgmt_http.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_http.erl rename to lib-ce/emqx_management/src/emqx_mgmt_http.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_sup.erl b/lib-ce/emqx_management/src/emqx_mgmt_sup.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_sup.erl rename to lib-ce/emqx_management/src/emqx_mgmt_sup.erl diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_util.erl b/lib-ce/emqx_management/src/emqx_mgmt_util.erl similarity index 100% rename from lib-opensource/emqx_management/src/emqx_mgmt_util.erl rename to lib-ce/emqx_management/src/emqx_mgmt_util.erl diff --git a/lib-opensource/emqx_management/test/emqx_mgmt_SUITE.erl b/lib-ce/emqx_management/test/emqx_mgmt_SUITE.erl similarity index 93% rename from lib-opensource/emqx_management/test/emqx_mgmt_SUITE.erl rename to lib-ce/emqx_management/test/emqx_mgmt_SUITE.erl index e41b9ee6a..56db2d118 100644 --- a/lib-opensource/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/lib-ce/emqx_management/test/emqx_mgmt_SUITE.erl @@ -49,7 +49,8 @@ groups() -> t_broker_cmd, t_router_cmd, t_subscriptions_cmd, - t_listeners_cmd + t_listeners_cmd_old, + t_listeners_cmd_new ]}]. apps() -> @@ -275,12 +276,35 @@ t_subscriptions_cmd(_) -> ?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"), unmock_print(). -t_listeners_cmd(_) -> +t_listeners_cmd_old(_) -> + ok = emqx_listeners:ensure_all_started(), mock_print(), ?assertEqual(emqx_mgmt_cli:listeners([]), ok), ?assertEqual( - emqx_mgmt_cli:listeners(["stop", "wss", "8084"]), - "Stop wss listener on 8084 successfully.\n" + "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n", + emqx_mgmt_cli:listeners(["stop", "wss", "8084"]) + ), + unmock_print(). + +t_listeners_cmd_new(_) -> + ok = emqx_listeners:ensure_all_started(), + mock_print(), + ?assertEqual(emqx_mgmt_cli:listeners([]), ok), + ?assertEqual( + "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n", + emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"]) + ), + ?assertEqual( + emqx_mgmt_cli:listeners(["restart", "mqtt:tcp:external"]), + "Restarted mqtt:tcp:external listener successfully.\n" + ), + ?assertEqual( + emqx_mgmt_cli:listeners(["restart", "mqtt:ssl:external"]), + "Restarted mqtt:ssl:external listener successfully.\n" + ), + ?assertEqual( + emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]), + "Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n" ), unmock_print(). diff --git a/lib-opensource/emqx_management/test/emqx_mgmt_api_SUITE.erl b/lib-ce/emqx_management/test/emqx_mgmt_api_SUITE.erl similarity index 100% rename from lib-opensource/emqx_management/test/emqx_mgmt_api_SUITE.erl rename to lib-ce/emqx_management/test/emqx_mgmt_api_SUITE.erl diff --git a/lib-opensource/emqx_management/test/etc/emqx_management.conf b/lib-ce/emqx_management/test/etc/emqx_management.conf similarity index 100% rename from lib-opensource/emqx_management/test/etc/emqx_management.conf rename to lib-ce/emqx_management/test/etc/emqx_management.conf diff --git a/lib-opensource/emqx_management/test/etc/emqx_reloader.conf b/lib-ce/emqx_management/test/etc/emqx_reloader.conf similarity index 100% rename from lib-opensource/emqx_management/test/etc/emqx_reloader.conf rename to lib-ce/emqx_management/test/etc/emqx_reloader.conf diff --git a/lib-opensource/emqx_management/test/rfc6455_client.erl b/lib-ce/emqx_management/test/rfc6455_client.erl similarity index 100% rename from lib-opensource/emqx_management/test/rfc6455_client.erl rename to lib-ce/emqx_management/test/rfc6455_client.erl diff --git a/lib-opensource/emqx_modules/etc/emqx_modules.conf b/lib-ce/emqx_modules/etc/emqx_modules.conf similarity index 100% rename from lib-opensource/emqx_modules/etc/emqx_modules.conf rename to lib-ce/emqx_modules/etc/emqx_modules.conf diff --git a/lib-opensource/emqx_modules/priv/emqx_modules.schema b/lib-ce/emqx_modules/priv/emqx_modules.schema similarity index 100% rename from lib-opensource/emqx_modules/priv/emqx_modules.schema rename to lib-ce/emqx_modules/priv/emqx_modules.schema diff --git a/lib-opensource/emqx_modules/rebar.config b/lib-ce/emqx_modules/rebar.config similarity index 100% rename from lib-opensource/emqx_modules/rebar.config rename to lib-ce/emqx_modules/rebar.config diff --git a/lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl b/lib-ce/emqx_modules/src/emqx_mod_acl_internal.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl rename to lib-ce/emqx_modules/src/emqx_mod_acl_internal.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_delayed.erl rename to lib-ce/emqx_modules/src/emqx_mod_delayed.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_presence.erl b/lib-ce/emqx_modules/src/emqx_mod_presence.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_presence.erl rename to lib-ce/emqx_modules/src/emqx_mod_presence.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_rewrite.erl rename to lib-ce/emqx_modules/src/emqx_mod_rewrite.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_subscription.erl b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_subscription.erl rename to lib-ce/emqx_modules/src/emqx_mod_subscription.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_sup.erl b/lib-ce/emqx_modules/src/emqx_mod_sup.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_sup.erl rename to lib-ce/emqx_modules/src/emqx_mod_sup.erl diff --git a/lib-opensource/emqx_modules/src/emqx_mod_topic_metrics.erl b/lib-ce/emqx_modules/src/emqx_mod_topic_metrics.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_mod_topic_metrics.erl rename to lib-ce/emqx_modules/src/emqx_mod_topic_metrics.erl diff --git a/lib-opensource/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_modules.app.src rename to lib-ce/emqx_modules/src/emqx_modules.app.src diff --git a/lib-opensource/emqx_modules/src/emqx_modules.erl b/lib-ce/emqx_modules/src/emqx_modules.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_modules.erl rename to lib-ce/emqx_modules/src/emqx_modules.erl diff --git a/lib-opensource/emqx_modules/src/emqx_modules_app.erl b/lib-ce/emqx_modules/src/emqx_modules_app.erl similarity index 100% rename from lib-opensource/emqx_modules/src/emqx_modules_app.erl rename to lib-ce/emqx_modules/src/emqx_modules_app.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_acl_internal_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_acl_internal_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_acl_internal_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_acl_internal_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_delayed_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_delayed_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_delayed_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_presence_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_presence_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_presence_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_presence_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_rewrite_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_subscription_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_subscription_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_sup_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_sup_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl diff --git a/lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl b/lib-ce/emqx_modules/test/emqx_modules_SUITE.erl similarity index 100% rename from lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_modules_SUITE.erl diff --git a/apps/emqx_telemetry/.gitignore b/lib-ce/emqx_telemetry/.gitignore similarity index 100% rename from apps/emqx_telemetry/.gitignore rename to lib-ce/emqx_telemetry/.gitignore diff --git a/apps/emqx_telemetry/README.md b/lib-ce/emqx_telemetry/README.md similarity index 100% rename from apps/emqx_telemetry/README.md rename to lib-ce/emqx_telemetry/README.md diff --git a/apps/emqx_telemetry/etc/emqx_telemetry.conf b/lib-ce/emqx_telemetry/etc/emqx_telemetry.conf similarity index 100% rename from apps/emqx_telemetry/etc/emqx_telemetry.conf rename to lib-ce/emqx_telemetry/etc/emqx_telemetry.conf diff --git a/apps/emqx_telemetry/priv/emqx_telemetry.schema b/lib-ce/emqx_telemetry/priv/emqx_telemetry.schema similarity index 100% rename from apps/emqx_telemetry/priv/emqx_telemetry.schema rename to lib-ce/emqx_telemetry/priv/emqx_telemetry.schema diff --git a/apps/emqx_telemetry/rebar.config b/lib-ce/emqx_telemetry/rebar.config similarity index 100% rename from apps/emqx_telemetry/rebar.config rename to lib-ce/emqx_telemetry/rebar.config diff --git a/apps/emqx_telemetry/src/emqx_telemetry.app.src b/lib-ce/emqx_telemetry/src/emqx_telemetry.app.src similarity index 100% rename from apps/emqx_telemetry/src/emqx_telemetry.app.src rename to lib-ce/emqx_telemetry/src/emqx_telemetry.app.src diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/lib-ce/emqx_telemetry/src/emqx_telemetry.erl similarity index 100% rename from apps/emqx_telemetry/src/emqx_telemetry.erl rename to lib-ce/emqx_telemetry/src/emqx_telemetry.erl diff --git a/apps/emqx_telemetry/src/emqx_telemetry_api.erl b/lib-ce/emqx_telemetry/src/emqx_telemetry_api.erl similarity index 100% rename from apps/emqx_telemetry/src/emqx_telemetry_api.erl rename to lib-ce/emqx_telemetry/src/emqx_telemetry_api.erl diff --git a/apps/emqx_telemetry/src/emqx_telemetry_app.erl b/lib-ce/emqx_telemetry/src/emqx_telemetry_app.erl similarity index 100% rename from apps/emqx_telemetry/src/emqx_telemetry_app.erl rename to lib-ce/emqx_telemetry/src/emqx_telemetry_app.erl diff --git a/apps/emqx_telemetry/src/emqx_telemetry_sup.erl b/lib-ce/emqx_telemetry/src/emqx_telemetry_sup.erl similarity index 100% rename from apps/emqx_telemetry/src/emqx_telemetry_sup.erl rename to lib-ce/emqx_telemetry/src/emqx_telemetry_sup.erl diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/lib-ce/emqx_telemetry/test/emqx_telemetry_SUITE.erl similarity index 100% rename from apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl rename to lib-ce/emqx_telemetry/test/emqx_telemetry_SUITE.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 98a628f7a..43ef0214a 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1596,6 +1596,10 @@ end}. hidden ]}. +{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [ + {datatype, {enum, [cn, dn, crt]}} +]}. + {mapping, "listener.ws.$name.check_origin_enable", "emqx.listeners", [ {datatype, {enum, [true, false]}}, {default, false}, @@ -1743,6 +1747,10 @@ end}. {datatype, string} ]}. +{mapping, "listener.wss.$name.dhfile", "emqx.listeners", [ + {datatype, string} +]}. + {mapping, "listener.wss.$name.depth", "emqx.listeners", [ {default, 10}, {datatype, integer} @@ -2009,8 +2017,15 @@ end}. Other -> Other end end, - [{Atom(Type), ListenOnN, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOnN + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end, SslListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), @@ -2018,9 +2033,16 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, - {tcp_options, TcpOpts(Prefix)}, - {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOn + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + , {ssl_options, SslOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] end end, lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name, "endpoint"], ListenOn} diff --git a/rebar.config.erl b/rebar.config.erl index 0e2637daa..0f5017c7d 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -28,8 +28,8 @@ config() -> extra_lib_dir() -> EnterpriseFlag = os:getenv("EMQX_ENTERPRISE"), case EnterpriseFlag =:= "true" orelse EnterpriseFlag =:= "1" of - true -> "lib-enterprise"; - false -> "lib-opensource" + true -> "lib-ee"; + false -> "lib-ce" end. project_app_dirs() -> @@ -279,7 +279,7 @@ provide_bcrypt_release(ReleaseType) -> %% rebar3 does not handle umberella project's cross-app parse_transform well compile_and_load_pase_transforms(Dir) -> PtFiles = - [ "lib-opensource/emqx_rule_engine/src/emqx_rule_actions_trans.erl" + [ "apps/emqx_rule_engine/src/emqx_rule_actions_trans.erl" ], CompileOpts = [verbose,report_errors,report_warnings,return_errors,debug_info], lists:foreach(fun(PtFile) -> {ok, _Mod} = compile:file(path(Dir, PtFile), CompileOpts) end, PtFiles). diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index d8ff0f033..2931bc9f2 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -16,8 +16,6 @@ -module(emqx_access_rule). --include("emqx.hrl"). - %% APIs -export([ match/3 , compile/1 diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index f62e9e500..b17443c45 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -37,6 +37,7 @@ -export([ activate/1 , activate/2 , deactivate/1 + , deactivate/2 , delete_all_deactivated_alarms/0 , get_alarms/0 , get_alarms/1 @@ -132,7 +133,10 @@ activate(Name, Details) -> gen_server:call(?MODULE, {activate_alarm, Name, Details}). deactivate(Name) -> - gen_server:call(?MODULE, {deactivate_alarm, Name}). + gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}). + +deactivate(Name, Details) -> + gen_server:call(?MODULE, {deactivate_alarm, Name, Details}). delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). @@ -183,34 +187,13 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act {reply, ok, State} end; -handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions, - size_limit = SizeLimit}) -> +handle_call({deactivate_alarm, Name, Details}, _From, State = #state{ + actions = Actions, size_limit = SizeLimit}) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [] -> {reply, {error, not_found}, State}; - [#activated_alarm{name = Name, - details = Details, - message = Message, - activate_at = ActivateAt}] -> - case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of - true -> - case mnesia:dirty_first(?DEACTIVATED_ALARM) of - '$end_of_table' -> - ok; - ActivateAt2 -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) - end; - false -> - ok - end, - Alarm = #deactivated_alarm{activate_at = ActivateAt, - name = Name, - details = Details, - message = Message, - deactivate_at = erlang:system_time(microsecond)}, - mnesia:dirty_delete(?ACTIVATED_ALARM, Name), - mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm), - do_actions(deactivate, Alarm, Actions), + [Alarm] -> + deactivate_alarm(Details, SizeLimit, Actions, Alarm), {reply, ok, State} end; @@ -260,23 +243,50 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ + activate_at = ActivateAt, name = Name, details = Details0, + message = Msg0}) -> + case SizeLimit > 0 andalso + (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of + true -> + case mnesia:dirty_first(?DEACTIVATED_ALARM) of + '$end_of_table' -> ok; + ActivateAt2 -> + mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + end; + false -> ok + end, + HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0, + erlang:system_time(microsecond)), + DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, + normalize_message(Name, Details), + erlang:system_time(microsecond)), + mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + do_actions(deactivate, DeActAlarm, Actions). + +make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> + #deactivated_alarm{ + activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = DeActivateAt}. + deactivate_all_alarms() -> lists:foreach( - fun(#activated_alarm{ - name = Name, - details = Details, - message = Message, - activate_at = ActivateAt - }) -> - mnesia:dirty_write(?DEACTIVATED_ALARM, - #deactivated_alarm{ - activate_at = ActivateAt, - name = Name, - details = Details, - message = Message, - deactivate_at = erlang:system_time(microsecond) - }) - end, ets:tab2list(?ACTIVATED_ALARM)), + fun(#activated_alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt}) -> + mnesia:dirty_write(?DEACTIVATED_ALARM, + #deactivated_alarm{ + activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = erlang:system_time(microsecond)}) + end, ets:tab2list(?ACTIVATED_ALARM)), clear_table(?ACTIVATED_ALARM). %% Delete all records from the given table, ignore result. @@ -355,6 +365,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, deactivate_at => DeactivateAt, activated => false}. +normalize_message(Name, no_details) -> + list_to_binary(io_lib:format("~p", [Name])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> @@ -367,8 +379,7 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); -normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) -> - list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId])); +normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> + list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. - diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 4be824fcb..cf4f8753d 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -32,11 +32,11 @@ start(_Type, _Args) -> print_banner(), ekka:start(), {ok, Sup} = emqx_sup:start_link(), + ok = start_autocluster(), ok = emqx_plugins:init(), _ = emqx_plugins:load(), emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), - start_autocluster(), register(emqx, self()), ok = emqx_alarm_handler:load(), print_vsn(), @@ -63,9 +63,8 @@ print_vsn() -> %%-------------------------------------------------------------------- %% Autocluster %%-------------------------------------------------------------------- - start_autocluster() -> ekka:callback(prepare, fun emqx:shutdown/1), ekka:callback(reboot, fun emqx:reboot/0), - ekka:autocluster(?APP). - + _ = ekka:autocluster(?APP), %% returns 'ok' or a pid or 'any()' as in spec + ok. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 9def736cb..2b51b1d02 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -31,6 +31,7 @@ -export([ info/1 , info/2 + , set_conn_state/2 , stats/1 , caps/1 ]). @@ -87,7 +88,7 @@ pendings :: list() }). --opaque(channel() :: #channel{}). +-type(channel() :: #channel{}). -type(conn_state() :: idle | connecting | connected | disconnected). @@ -127,26 +128,26 @@ info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(zone, #channel{clientinfo = #{zone := Zone}}) -> - Zone; -info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> - ClientId; -info(username, #channel{clientinfo = #{username := Username}}) -> - Username; -info(socktype, #channel{conninfo = #{socktype := SockType}}) -> - SockType; -info(peername, #channel{conninfo = #{peername := Peername}}) -> - Peername; -info(sockname, #channel{conninfo = #{sockname := Sockname}}) -> - Sockname; -info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) -> - ProtoName; -info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) -> - ProtoVer; -info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) -> - ConnectedAt; +info(socktype, #channel{conninfo = ConnInfo}) -> + maps:get(socktype, ConnInfo, undefined); +info(peername, #channel{conninfo = ConnInfo}) -> + maps:get(peername, ConnInfo, undefined); +info(sockname, #channel{conninfo = ConnInfo}) -> + maps:get(sockname, ConnInfo, undefined); +info(proto_name, #channel{conninfo = ConnInfo}) -> + maps:get(proto_name, ConnInfo, undefined); +info(proto_ver, #channel{conninfo = ConnInfo}) -> + maps:get(proto_ver, ConnInfo, undefined); +info(connected_at, #channel{conninfo = ConnInfo}) -> + maps:get(connected_at, ConnInfo, undefined); info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; +info(zone, #channel{clientinfo = ClientInfo}) -> + maps:get(zone, ClientInfo, undefined); +info(clientid, #channel{clientinfo = ClientInfo}) -> + maps:get(clientid, ClientInfo, undefined); +info(username, #channel{clientinfo = ClientInfo}) -> + maps:get(username, ClientInfo, undefined); info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); info(conn_state, #channel{conn_state = ConnState}) -> @@ -163,6 +164,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) -> Limits; info(timers, #channel{timers = Timers}) -> Timers. +set_conn_state(ConnState, Channel) -> + Channel#channel{conn_state = ConnState}. + %% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session})-> diff --git a/src/emqx_congestion.erl b/src/emqx_congestion.erl new file mode 100644 index 000000000..0e7992f2e --- /dev/null +++ b/src/emqx_congestion.erl @@ -0,0 +1,164 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_congestion). + +-export([ maybe_alarm_port_busy/3 + , maybe_alarm_port_busy/4 + , maybe_alarm_too_many_publish/5 + , maybe_alarm_too_many_publish/6 + , cancel_alarms/3 + ]). + +-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]). + +-define(ALARM_CONN_CONGEST(Channel, Reason), + list_to_binary( + io_lib:format("mqtt_conn/congested/~s/~s/~s", + [emqx_channel:info(clientid, Channel), + maps:get(username, emqx_channel:info(clientinfo, Channel), + <<"undefined">>), + Reason]))). + +-define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username, + proto_name, proto_ver, connected_at, conn_state]). +-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). +-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). +-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). +-define(ALARM_SENT(REASON), {alarm_sent, REASON}). +-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). +-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). +-define(CONFIRM_CLEAR_INTERVAL, 10000). + +maybe_alarm_port_busy(Socket, Transport, Channel) -> + maybe_alarm_port_busy(Socket, Transport, Channel, false). + +maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> + case is_tcp_congested(Socket, Transport) of + true -> alarm_congestion(Socket, Transport, Channel, port_busy); + false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, + ForceClear) + end. + +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + MaxBatchSize) -> + maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + MaxBatchSize, false). + +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + PubMsgCount = _MaxBatchSize, _ForceClear) -> + %% we only alarm it when the process is "too busy" + alarm_congestion(Socket, Transport, Channel, too_many_publish); +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + _MaxBatchSize, ForceClear) when PubMsgCount == 0 -> + %% but we clear the alarm until it is really "idle", to avoid sending + %% alarms and clears too frequently + cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish, + ForceClear); +maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount, + _MaxBatchSize, _ForceClear) -> + ok. + +cancel_alarms(Socket, Transport, Channel) -> + lists:foreach(fun(Reason) -> + do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) + end, ?ALL_ALARM_REASONS). + +alarm_congestion(Socket, Transport, Channel, Reason) -> + case has_alarm_sent(Reason) of + false -> do_alarm_congestion(Socket, Transport, Channel, Reason); + true -> + %% pretend we have sent an alarm again + update_alarm_sent_at(Reason) + end. + +cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> + case is_alarm_allowed_clear(Reason, ForceClear) of + true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); + false -> ok + end. + +do_alarm_congestion(Socket, Transport, Channel, Reason) -> + ok = update_alarm_sent_at(Reason), + AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), + emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + ok. + +do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> + ok = remove_alarm_sent_at(Reason), + AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), + emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + ok. + +is_tcp_congested(Socket, Transport) -> + case Transport:getstat(Socket, [send_pend]) of + {ok, [{send_pend, N}]} when N > 0 -> true; + _ -> false + end. + +has_alarm_sent(Reason) -> + case get_alarm_sent_at(Reason) of + 0 -> false; + _ -> true + end. +update_alarm_sent_at(Reason) -> + erlang:put(?ALARM_SENT(Reason), timenow()), + ok. +remove_alarm_sent_at(Reason) -> + erlang:erase(?ALARM_SENT(Reason)), + ok. +get_alarm_sent_at(Reason) -> + case erlang:get(?ALARM_SENT(Reason)) of + undefined -> 0; + LastSentAt -> LastSentAt + end. + +is_alarm_allowed_clear(Reason, _ForceClear = true) -> + has_alarm_sent(Reason); +is_alarm_allowed_clear(Reason, _ForceClear = false) -> + %% only sent clears when the alarm was not triggered in the last + %% ?CONFIRM_CLEAR_INTERVAL time + case timenow() - get_alarm_sent_at(Reason) of + Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; + _ -> false + end. + +timenow() -> + erlang:system_time(millisecond). + +%%============================================================================== +%% Alarm message +%%============================================================================== +tcp_congestion_alarm_details(Socket, Transport, Channel) -> + ProcInfo = process_info(self(), ?PROC_INFO_KEYS), + BasicInfo = [{pid, list_to_binary(pid_to_list(self()))} | ProcInfo], + Stat = case Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS) of + {ok, Stat0} -> Stat0; + {error, _} -> [] + end, + Opts = case Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS) of + {ok, Opts0} -> Opts0; + {error, _} -> [] + end, + SockInfo = Stat ++ Opts, + ConnInfo = [conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS], + maps:from_list(BasicInfo ++ ConnInfo ++ SockInfo). + +conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> + {IPStr, Port} = emqx_channel:info(Key, Channel), + {Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])}; +conn_info(Key, Channel) -> + {Key, emqx_channel:info(Key, Channel)}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index b682a3ed5..11a3a9418 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -29,7 +29,7 @@ -compile(nowarn_export_all). -endif. --elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_connection]}}]). +-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]). %% API -export([ start_link/3 @@ -276,7 +276,7 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> Msg -> process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)) after - IdleTimeout -> + IdleTimeout + 100 -> hibernate(Parent, cancel_stats_timer(State)) end. @@ -389,8 +389,12 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + #state{active_n = MaxBatchSize, transport = Transport, + socket = Socket, channel = Channel} = State) -> + Delivers0 = emqx_misc:drain_deliver(MaxBatchSize), + emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, + length(Delivers0), MaxBatchSize), + Delivers = [Deliver|Delivers0], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -443,10 +447,12 @@ handle_msg(Msg, State) -> %% Terminate -spec terminate(any(), state()) -> no_return(). -terminate(Reason, State = #state{channel = Channel}) -> +terminate(Reason, State = #state{channel = Channel, transport = Transport, + socket = Socket}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), - emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), - emqx_channel:terminate(Reason, Channel), + Channel1 = emqx_channel:set_conn_state(disconnected, Channel), + emqx_congestion:cancel_alarms(Socket, Transport, Channel1), + emqx_channel:terminate(Reason, Channel1), _ = close_socket(State), exit(Reason). @@ -502,8 +508,12 @@ handle_timeout(_TRef, limit_timeout, State) -> }, handle_info(activate_socket, NState); -handle_timeout(_TRef, emit_stats, State = - #state{channel = Channel}) -> +handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, + channel = Channel, transport = Transport, socket = Socket}) -> + {_, MsgQLen} = erlang:process_info(self(), message_queue_len), + emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true), + emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, + MsgQLen, MaxBatchSize, true), ClientId = emqx_channel:info(clientid, Channel), emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; @@ -616,7 +626,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), inc_counter(outgoing_bytes, Oct), - maybe_warn_congestion(Socket, Transport, Channel), + emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} -> @@ -625,48 +635,6 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ok end. -maybe_warn_congestion(Socket, Transport, Channel) -> - IsCongestAlarmSet = is_congestion_alarm_set(), - case is_congested(Socket, Transport) of - true when not IsCongestAlarmSet -> - ok = set_congestion_alarm(), - emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), - tcp_congestion_alarm_details(Socket, Transport, Channel)); - false when IsCongestAlarmSet -> - ok = clear_congestion_alarm(), - emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); - _ -> ok - end. - -is_congested(Socket, Transport) -> - case Transport:getstat(Socket, [send_pend]) of - {ok, [{send_pend, N}]} when N > 0 -> true; - _ -> false - end. - -is_congestion_alarm_set() -> - case erlang:get(conn_congested) of - true -> true; - _ -> false - end. -set_congestion_alarm() -> - erlang:put(conn_congested, true), ok. -clear_congestion_alarm() -> - erlang:put(conn_congested, false), ok. - -tcp_congestion_alarm_details(Socket, Transport, Channel) -> - {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS), - {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS), - SockInfo = maps:from_list(Stat ++ Opts), - ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]), - maps:merge(ConnInfo, SockInfo). - -conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> - {IPStr, Port} = emqx_channel:info(Key, Channel), - {Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])}; -conn_info(Key, Channel) -> - {Key, emqx_channel:info(Key, Channel)}. - %%-------------------------------------------------------------------- %% Handle Info diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 3b642d55f..651be0e8e 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -21,6 +21,7 @@ %% APIs -export([ start/0 + , ensure_all_started/0 , restart/0 , stop/0 ]). @@ -28,30 +29,88 @@ -export([ start_listener/1 , start_listener/3 , stop_listener/1 - , stop_listener/3 , restart_listener/1 , restart_listener/3 ]). --type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}). +-export([ find_id_by_listen_on/1 + , find_by_listen_on/1 + , find_by_id/1 + , identifier/1 + , format_listen_on/1 + ]). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- +-type(listener() :: #{ name := binary() + , proto := esockd:proto() + , listen_on := esockd:listen_on() + , opts := [esockd:option()] + }). + +%% @doc Find listener identifier by listen-on. +%% Return empty string (binary) if listener is not found in config. +-spec(find_id_by_listen_on(esockd:listen_on()) -> binary() | false). +find_id_by_listen_on(ListenOn) -> + case find_by_listen_on(ListenOn) of + false -> false; + L -> identifier(L) + end. + +%% @doc Find listener by listen-on. +%% Return 'false' if not found. +-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false). +find_by_listen_on(ListenOn) -> + find_by_listen_on(ListenOn, emqx:get_env(listeners, [])). + +%% @doc Find listener by identifier. +%% Return 'false' if not found. +-spec(find_by_id(string() | binary()) -> listener() | false). +find_by_id(Id) -> + find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])). + +%% @doc Return the ID of the given listener. +-spec identifier(listener()) -> binary(). +identifier(#{proto := Proto, name := Name}) -> + identifier(Proto, Name). %% @doc Start all listeners. -spec(start() -> ok). start() -> lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). +%% @doc Ensure all configured listeners are started. +%% Raise exception if any of them failed to start. +-spec(ensure_all_started() -> ok). +ensure_all_started() -> + ensure_all_started(emqx:get_env(listeners, []), []). + +ensure_all_started([], []) -> ok; +ensure_all_started([], Failed) -> error(Failed); +ensure_all_started([L | Rest], Results) -> + #{proto := Proto, listen_on := ListenOn, opts := Options} = L, + NewResults = + case start_listener(Proto, ListenOn, Options) of + {ok, _Pid} -> + Results; + {error, {already_started, _Pid}} -> + Results; + {error, Reason} -> + [{identifier(L), Reason} | Results] + end, + ensure_all_started(Rest, NewResults). + +%% @doc Format address:port for logging. +-spec(format_listen_on(esockd:listen_on()) -> binary()). +format_listen_on(ListenOn) -> format(ListenOn). + -spec(start_listener(listener()) -> ok). -start_listener({Proto, ListenOn, Options}) -> +start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) -> + ID = identifier(Proto, Name), case start_listener(Proto, ListenOn, Options) of - {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); + {ok, _} -> io:format("Start ~s listener on ~s successfully.~n", + [ID, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!", - [Proto, format(ListenOn), Reason]), + io:format(standard_error, "Failed to start mqtt listener ~s on ~s: ~0p~n", + [ID, format(ListenOn), Reason]), error(Reason) end. @@ -114,40 +173,42 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> restart() -> lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). --spec(restart_listener(listener()) -> any()). -restart_listener({Proto, ListenOn, Options}) -> - restart_listener(Proto, ListenOn, Options). +-spec(restart_listener(listener() | string() | binary()) -> ok | {error, any()}). +restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) -> + restart_listener(Proto, ListenOn, Options); +restart_listener(Identifier) -> + case emqx_listeners:find_by_id(Identifier) of + false -> {error, {no_such_listener, Identifier}}; + Listener -> restart_listener(Listener) + end. --spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()). +-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> + ok | {error, any()}). restart_listener(tcp, ListenOn, _Options) -> esockd:reopen('mqtt:tcp', ListenOn); restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> esockd:reopen('mqtt:ssl', ListenOn); restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> _ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)), - start_listener(Proto, ListenOn, Options); + ok(start_listener(Proto, ListenOn, Options)); restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> _ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)), - start_listener(Proto, ListenOn, Options); + ok(start_listener(Proto, ListenOn, Options)); restart_listener(Proto, ListenOn, _Opts) -> esockd:reopen(Proto, ListenOn). +ok(ok) -> ok; +ok({ok, _}) -> ok; +ok(Error) -> Error. + %% @doc Stop all listeners. -spec(stop() -> ok). stop() -> lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, term()}). -stop_listener({Proto, ListenOn, Opts}) -> - StopRet = stop_listener(Proto, ListenOn, Opts), - case StopRet of - ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n", - [Proto, format(ListenOn)]); - {error, Reason} -> - io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.", - [Proto, format(ListenOn), Reason]) - end, - StopRet. +stop_listener(#{proto := Proto, listen_on := ListenOn, opts := Opts}) -> + stop_listener(Proto, ListenOn, Opts). -spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> ok | {error, term()}). @@ -181,3 +242,19 @@ ws_name(Name, {_Addr, Port}) -> ws_name(Name, Port); ws_name(Name, Port) -> list_to_atom(lists:concat([Name, ":", Port])). + +identifier(Proto, Name) when is_atom(Proto) -> + identifier(atom_to_list(Proto), Name); +identifier(Proto, Name) -> + iolist_to_binary(["mqtt", ":", Proto, ":", Name]). + +find_by_listen_on(_ListenOn, []) -> false; +find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L; +find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest). + +find_by_id(_Id, []) -> false; +find_by_id(Id, [L | Rest]) -> + case identifier(L) =:= Id of + true -> L; + false -> find_by_id(Id, Rest) + end. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 46db21608..88acec4e7 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -22,6 +22,7 @@ -include("logger.hrl"). -include("types.hrl"). -include("emqx_mqtt.hrl"). +-include("emqx.hrl"). -logger_header("[Metrics]"). diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index f7449074d..0b057e1f8 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -145,12 +145,12 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, case emqx_vm:cpu_util() of %% TODO: should be improved? 0 -> State#{timer := undefined}; - Busy when Busy / 100 >= CPUHighWatermark -> + Busy when Busy >= CPUHighWatermark -> emqx_alarm:activate(high_cpu_usage, #{usage => Busy, high_watermark => CPUHighWatermark, low_watermark => CPULowWatermark}), ensure_check_timer(State); - Busy when Busy / 100 =< CPULowWatermark -> + Busy when Busy =< CPULowWatermark -> emqx_alarm:deactivate(high_cpu_usage), ensure_check_timer(State); _Busy -> diff --git a/sync-apps.sh b/sync-apps.sh index e5c36228f..1e8da78cc 100755 --- a/sync-apps.sh +++ b/sync-apps.sh @@ -15,17 +15,17 @@ apps=( "emqx_auth_redis" "emqx_bridge_mqtt" "emqx_coap" -# "emqx_dashboard" # moved to lib-opensource +# "emqx_dashboard" # moved to lib-ce "emqx_exhook" "emqx_exproto" "emqx_lua_hook" "emqx_lwm2m" -# "emqx_management" # moved to lib-opensource +# "emqx_management" # moved to lib-ce "emqx_prometheus" "emqx_psk_file" "emqx_recon" "emqx_retainer" -# "emqx_rule_engine" # moved to lib-opensource +# "emqx_rule_engine" # moved to lib-ce "emqx_sasl" "emqx_sn" "emqx_stomp" diff --git a/test/emqx_acl_cache_SUITE.erl b/test/emqx_acl_cache_SUITE.erl index a1ad061e6..8c7685aa2 100644 --- a/test/emqx_acl_cache_SUITE.erl +++ b/test/emqx_acl_cache_SUITE.erl @@ -56,7 +56,7 @@ t_clean_acl_cache(_) -> emqtt:stop(Client). % optimize?? -t_reload_aclfile_and_cleanall(Config) -> +t_reload_aclfile_and_cleanall(_Config) -> RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end, disconnected => fun(_) -> ok end, @@ -79,27 +79,6 @@ t_reload_aclfile_and_cleanall(Config) -> %% Check acl cache list [ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>), ?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0), - - %% Update acl file and reload mod_acl_internal - Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]), - ok = file:write_file(Path, <<"{deny, all}.">>), - OldPath = emqx:get_env(acl_file), - % application:set_env(emqx, acl_file, Path), - emqx_mod_acl_internal:reload([{acl_file, Path}]), - - ?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0), - {ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1), - - receive - {puback, #{packet_id := PktId2, reason_code := Rc2}} -> - %% Not authorized - ?assertEqual(16#87, Rc2); - _ -> - ?assert(false) - end, - application:set_env(emqx, acl_file, OldPath), - file:delete(Path), - emqx_mod_acl_internal:reload([{acl_file, OldPath}]), emqtt:stop(Client). %% @private diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 87dd75baf..b011c6978 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -54,6 +54,7 @@ init_per_suite(Config) -> ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end), ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), + ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end), Config. @@ -77,6 +78,9 @@ init_per_testcase(_TestCase, Config) -> (peercert, [sock]) -> undefined end), ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end), + ok = meck:expect(emqx_transport, getopts, fun(_Sock, Options) -> + {ok, [{K, 0} || K <- Options]} + end), ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) -> {ok, [{K, 0} || K <- Options]} end), diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 738fec169..814162a4a 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -183,11 +183,7 @@ t_batch_subscribe(_) -> {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]), {ok, _} = emqtt:connect(Client), application:set_env(emqx, enable_acl_cache, false), - TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"), - file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe, - [\"t1\", \"t2\", \"t3\"]}.\n"), - timer:sleep(10), - emqx_mod_acl_internal:reload([{acl_file, TempAcl}]), + application:set_env(emqx, acl_nomatch, deny), {ok, _, [?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1}, @@ -198,7 +194,7 @@ t_batch_subscribe(_) -> ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, <<"t2">>, <<"t3">>]), - file:delete(TempAcl), + application:set_env(emqx, acl_nomatch, allow), emqtt:disconnect(Client). t_connect_will_retain(_) -> @@ -336,64 +332,63 @@ t_connect_session_expiry_interval(_) -> ok = emqtt:disconnect(Client3). %% [MQTT-3.1.3-9] -t_connect_will_delay_interval(_) -> - process_flag(trap_exit, true), - Topic = nth(1, ?TOPICS), - Payload = "will message", - - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), - - {ok, Client2} = emqtt:start_link([ - {clientid, <<"t_connect_will_delay_interval">>}, - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_qos, 2}, - {will_topic, Topic}, - {will_payload, Payload}, - {will_props, #{'Will-Delay-Interval' => 3}}, - {properties, #{'Session-Expiry-Interval' => 7200}} - ]), - {ok, _} = emqtt:connect(Client2), - %% terminate the client without sending the DISCONNECT - emqtt:stop(Client2), - %% should not get the will msg in 2.5s - timer:sleep(1500), - ?assertEqual(0, length(receive_messages(1))), - %% should get the will msg in 4.5s - timer:sleep(1000), - ?assertEqual(1, length(receive_messages(1))), - - %% try again, but let the session expire quickly - {ok, Client3} = emqtt:start_link([ - {clientid, <<"t_connect_will_delay_interval">>}, - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_qos, 2}, - {will_topic, Topic}, - {will_payload, Payload}, - {will_props, #{'Will-Delay-Interval' => 7200}}, - {properties, #{'Session-Expiry-Interval' => 3}} - ]), - {ok, _} = emqtt:connect(Client3), - %% terminate the client without sending the DISCONNECT - emqtt:stop(Client3), - %% should not get the will msg in 2.5s - timer:sleep(1500), - ?assertEqual(0, length(receive_messages(1))), - %% should get the will msg in 4.5s - timer:sleep(1000), - ?assertEqual(1, length(receive_messages(1))), - - ok = emqtt:disconnect(Client1), - - receive {'EXIT', _, _} -> ok - after 100 -> ok - end, - process_flag(trap_exit, false). +%% !!!REFACTOR NEED: +%t_connect_will_delay_interval(_) -> +% process_flag(trap_exit, true), +% Topic = nth(1, ?TOPICS), +% Payload = "will message", +% +% {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), +% {ok, _} = emqtt:connect(Client1), +% {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), +% +% {ok, Client2} = emqtt:start_link([ +% {clientid, <<"t_connect_will_delay_interval">>}, +% {proto_ver, v5}, +% {clean_start, true}, +% {will_flag, true}, +% {will_qos, 2}, +% {will_topic, Topic}, +% {will_payload, Payload}, +% {will_props, #{'Will-Delay-Interval' => 3}}, +% {properties, #{'Session-Expiry-Interval' => 7200}}, +% {keepalive, 2} +% ]), +% {ok, _} = emqtt:connect(Client2), +% timer:sleep(50), +% erlang:exit(Client2, kill), +% timer:sleep(2000), +% ?assertEqual(0, length(receive_messages(1))), +% timer:sleep(5000), +% ?assertEqual(1, length(receive_messages(1))), +% +% {ok, Client3} = emqtt:start_link([ +% {clientid, <<"t_connect_will_delay_interval">>}, +% {proto_ver, v5}, +% {clean_start, true}, +% {will_flag, true}, +% {will_qos, 2}, +% {will_topic, Topic}, +% {will_payload, Payload}, +% {will_props, #{'Will-Delay-Interval' => 7200}}, +% {properties, #{'Session-Expiry-Interval' => 3}}, +% {keepalive, 2} +% ]), +% {ok, _} = emqtt:connect(Client3), +% timer:sleep(50), +% erlang:exit(Client3, kill), +% +% timer:sleep(2000), +% ?assertEqual(0, length(receive_messages(1))), +% timer:sleep(5000), +% ?assertEqual(1, length(receive_messages(1))), +% +% ok = emqtt:disconnect(Client1), +% +% receive {'EXIT', _, _} -> ok +% after 100 -> ok +% end, +% process_flag(trap_exit, false). %% [MQTT-3.1.4-3] t_connect_duplicate_clientid(_) ->