From ec88e3a404dc70d59ef2f7cc539c5af1cae3036d Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Fri, 23 Jul 2021 17:55:46 +0800 Subject: [PATCH 01/51] chore: sys uptime by millisecond --- apps/emqx/src/emqx_sys.erl | 21 +++---------------- apps/emqx/test/emqx_sys_SUITE.erl | 6 ------ apps/emqx/test/props/prop_emqx_sys.erl | 2 +- apps/emqx_management/src/emqx_mgmt.erl | 2 +- .../src/emqx_mgmt_api_nodes.erl | 4 ++-- 5 files changed, 7 insertions(+), 28 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 2f3f782e6..8881de5cb 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -90,7 +90,7 @@ version() -> emqx_app:get_release(). sysdescr() -> emqx_app:get_description(). %% @doc Get sys uptime --spec(uptime() -> string()). +-spec(uptime() -> Milliseconds :: integer()). uptime() -> gen_server:call(?SYS, uptime). @@ -143,7 +143,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> - publish_any(uptime, iolist_to_binary(uptime(State))), + publish_any(uptime, integer_to_binary(uptime(State))), publish_any(datetime, iolist_to_binary(datetime())), {noreply, heartbeat(State)}; @@ -168,22 +168,7 @@ terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> %%----------------------------------------------------------------------------- uptime(#state{start_time = Ts}) -> - Secs = timer:now_diff(erlang:timestamp(), Ts) div 1000000, - lists:flatten(uptime(seconds, Secs)). -uptime(seconds, Secs) when Secs < 60 -> - [integer_to_list(Secs), " seconds"]; -uptime(seconds, Secs) -> - [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; -uptime(minutes, M) when M < 60 -> - [integer_to_list(M), " minutes, "]; -uptime(minutes, M) -> - [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; -uptime(hours, H) when H < 24 -> - [integer_to_list(H), " hours, "]; -uptime(hours, H) -> - [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; -uptime(days, D) -> - [integer_to_list(D), " days, "]. + timer:now_diff(erlang:timestamp(), Ts) div 1000. publish_any(Name, Value) -> _ = publish(Name, Value), diff --git a/apps/emqx/test/emqx_sys_SUITE.erl b/apps/emqx/test/emqx_sys_SUITE.erl index 65f09caf6..354e1a8a2 100644 --- a/apps/emqx/test/emqx_sys_SUITE.erl +++ b/apps/emqx/test/emqx_sys_SUITE.erl @@ -39,12 +39,6 @@ end_per_suite(_Config) -> % t_sysdescr(_) -> % error('TODO'). -t_uptime(_) -> - ?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 1))), - ?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 60))), - ?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 3600))), - ?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 86400))). - % t_datetime(_) -> % error('TODO'). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 170611061..3b2b8b94c 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -114,7 +114,7 @@ postcondition(_State, {call, emqx_sys, info, []}, Info) -> postcondition(_State, {call, emqx_sys, version, []}, Version) -> is_list(Version); postcondition(_State, {call, emqx_sys, uptime, []}, Uptime) -> - is_list(Uptime); + is_integer(Uptime); postcondition(_State, {call, emqx_sys, datetime, []}, Datetime) -> is_list(Datetime); postcondition(_State, {call, emqx_sys, sysdescr, []}, Sysdescr) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index eae5563ba..8c915a950 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -147,7 +147,7 @@ node_info(Node) when Node =:= node() -> max_fds => proplists:get_value(max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))), connections => ets:info(emqx_channel, size), node_status => 'Running', - uptime => iolist_to_binary(proplists:get_value(uptime, BrokerInfo)), + uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)) }; node_info(Node) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index bd92abf98..6ead07b07 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -82,8 +82,8 @@ node_schema() -> type => integer, description => <<"Number of used processes">>}, uptime => #{ - type => string, - description => <<"EMQ X Broker runtime">>}, + type => integer, + description => <<"EMQ X Broker runtime, millisecond">>}, version => #{ type => string, description => <<"EMQ X Broker version">>}, From ca50dfe6867db329651f02777efd108d6a62d965 Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Mon, 2 Aug 2021 10:03:28 +0800 Subject: [PATCH 02/51] fix: add cleints api query params doc --- .../src/emqx_mgmt_api_clients.erl | 88 ++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index b99d6c6ef..97b6559f0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -41,7 +41,7 @@ -export([do_subscribe/3]). -define(CLIENT_QS_SCHEMA, {emqx_channel_info, - [ {<<"clientid">>, binary} + [ {<<"node">>, atom} , {<<"username">>, binary} , {<<"zone">>, atom} , {<<"ip_address">>, ip} @@ -227,6 +227,92 @@ clients_api() -> Metadata = #{ get => #{ description => <<"List clients">>, + parameters => [ + #{ + name => node, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => username, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => zone, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => ip_address, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => conn_state, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => clean_start, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => proto_name, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => proto_ver, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => like_clientid, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => like_username, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => gte_created_at, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => lte_created_at, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => gte_connected_at, + in => query, + required => false, + schema => #{type => string} + }, + #{ + name => lte_connected_at, + in => query, + required => false, + schema => #{type => string} + } + ], responses => #{ <<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}}, {"/clients", Metadata, clients}. From 52ff6e1b3ef272b38ca70d5cc1e1957bfd3cd37c Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Mon, 2 Aug 2021 14:59:49 +0800 Subject: [PATCH 03/51] fix: listeners id param name --- apps/emqx_management/src/emqx_mgmt.erl | 6 +- .../src/emqx_mgmt_api_listeners.erl | 82 +++++++++---------- .../test/emqx_mgmt_listeners_api_SUITE.erl | 41 ++++++---- 3 files changed, 67 insertions(+), 62 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 20f1aa108..d4a222630 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -502,10 +502,10 @@ listener_id_filter(Identifier, Listeners) -> -spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) -> ok | {error, Reason :: term()}. -manage_listener(Operation, #{identifier := Identifier, node := Node}) when Node =:= node()-> - erlang:apply(emqx_listeners, Operation, [Identifier]); +manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()-> + erlang:apply(emqx_listeners, Operation, [ID]); manage_listener(Operation, Param = #{node := Node}) -> - rpc_call(Node, restart_listener, [Operation, Param]). + rpc_call(Node, manage_listener, [Operation, Param]). %%-------------------------------------------------------------------- %% Get Alarms diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index e845d2679..78bdba615 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -35,7 +35,7 @@ api_spec() -> { [ listeners_api(), - restart_listeners_api(), + listener_api(), nodes_listeners_api(), nodes_listener_api(), manage_listeners_api(), @@ -53,21 +53,21 @@ listener_schema() -> type => string, description => <<"Node">>, example => node()}, - identifier => #{ + id => #{ type => string, description => <<"Identifier">>}, acceptors => #{ type => integer, - description => <<"Number of Acceptor proce">>}, + description => <<"Number of Acceptor process">>}, max_conn => #{ type => integer, description => <<"Maximum number of allowed connection">>}, type => #{ type => string, - description => <<"Plugin decription">>}, + description => <<"Listener type">>}, listen_on => #{ type => string, - description => <<"Litening port">>}, + description => <<"Listening port">>}, running => #{ type => boolean, description => <<"Open or close">>}, @@ -84,24 +84,24 @@ listeners_api() -> emqx_mgmt_util:response_array_schema(<<"List all listeners">>, listener)}}}, {"/listeners", Metadata, listeners}. -restart_listeners_api() -> +listener_api() -> Metadata = #{ get => #{ description => <<"List listeners by listener ID">>, - parameters => [param_path_identifier()], + parameters => [param_path_id()], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']), <<"200">> => emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, listener)}}}, - {"/listeners/:identifier", Metadata, listener}. + {"/listeners/:id", Metadata, listener}. manage_listeners_api() -> Metadata = #{ get => #{ description => <<"Restart listeners in cluster">>, parameters => [ - param_path_identifier(), + param_path_id(), param_path_operation()], responses => #{ <<"500">> => @@ -114,15 +114,15 @@ manage_listeners_api() -> ['BAD_REQUEST']), <<"200">> => emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, - {"/listeners/:identifier/:operation", Metadata, manage_listeners}. + {"/listeners/:id/:operation", Metadata, manage_listeners}. manage_nodes_listeners_api() -> Metadata = #{ - get => #{ + put => #{ description => <<"Restart listeners in cluster">>, parameters => [ param_path_node(), - param_path_identifier(), + param_path_id(), param_path_operation()], responses => #{ <<"500">> => @@ -135,20 +135,20 @@ manage_nodes_listeners_api() -> ['BAD_REQUEST']), <<"200">> => emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, - {"/node/:node/listeners/:identifier/:operation", Metadata, manage_nodes_listeners}. + {"/node/:node/listeners/:id/:operation", Metadata, manage_nodes_listeners}. nodes_listeners_api() -> Metadata = #{ get => #{ description => <<"Get listener info in one node">>, - parameters => [param_path_node(), param_path_identifier()], + parameters => [param_path_node(), param_path_id()], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>, ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), <<"200">> => emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}}, - {"/nodes/:node/listeners/:identifier", Metadata, node_listener}. + {"/nodes/:node/listeners/:id", Metadata, node_listener}. nodes_listener_api() -> Metadata = #{ @@ -172,10 +172,10 @@ param_path_node() -> example => node() }. -param_path_identifier() -> +param_path_id() -> {Example,_} = hd(emqx_mgmt:list_listeners(node())), #{ - name => identifier, + name => id, in => path, schema => #{type => string}, required => true, @@ -199,8 +199,8 @@ listeners(get, _Request) -> list(). listener(get, Request) -> - ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), - get_listeners(#{identifier => ListenerID}). + ID = binary_to_atom(cowboy_req:binding(id, Request)), + get_listeners(#{id => ID}). node_listeners(get, Request) -> Node = binary_to_atom(cowboy_req:binding(node, Request)), @@ -208,19 +208,19 @@ node_listeners(get, Request) -> node_listener(get, Request) -> Node = binary_to_atom(cowboy_req:binding(node, Request)), - ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), - get_listeners(#{node => Node, identifier => ListenerID}). + ID = binary_to_atom(cowboy_req:binding(id, Request)), + get_listeners(#{node => Node, id => ID}). manage_listeners(_, Request) -> - Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + ID = binary_to_atom(cowboy_req:binding(id, Request)), Operation = binary_to_atom(cowboy_req:binding(operation, Request)), - manage(Operation, #{identifier => Identifier}). + manage(Operation, #{id => ID}). manage_nodes_listeners(_, Request) -> Node = binary_to_atom(cowboy_req:binding(node, Request)), - Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + ID = binary_to_atom(cowboy_req:binding(id, Request)), Operation = binary_to_atom(cowboy_req:binding(operation, Request)), - manage(Operation, #{identifier => Identifier, node => Node}). + manage(Operation, #{id => ID, node => Node}). %%%============================================================================================== @@ -231,8 +231,8 @@ list() -> get_listeners(Param) -> case list_listener(Param) of {error, not_found} -> - Identifier = maps:get(identifier, Param), - Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + ID = maps:get(id, Param), + Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])), {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; {error, nodedown} -> Node = maps:get(node, Param), @@ -240,8 +240,8 @@ get_listeners(Param) -> Response = #{code => 'BAD_NODE_NAME', message => Reason}, {404, Response}; [] -> - Identifier = maps:get(identifier, Param), - Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + ID = maps:get(id, Param), + Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])), {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; Data -> {200, Data} @@ -252,8 +252,8 @@ manage(Operation0, Param) -> Operation = maps:get(Operation0, OperationMap), case list_listener(Param) of {error, not_found} -> - Identifier = maps:get(identifier, Param), - Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + ID = maps:get(id, Param), + Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])), {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; {error, nodedown} -> Node = maps:get(node, Param), @@ -261,8 +261,8 @@ manage(Operation0, Param) -> Response = #{code => 'BAD_NODE_NAME', message => Reason}, {404, Response}; [] -> - Identifier = maps:get(identifier, Param), - Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + ID = maps:get(id, Param), + Reason = list_to_binary(io_lib:format("Error listener id ~p", [ID])), {404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}}; ListenersOrSingleListener -> manage_(Operation, ListenersOrSingleListener) @@ -278,14 +278,14 @@ manage_(Operation, Listeners) when is_list(Listeners) -> Errors -> case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of [] -> - Identifier = maps:get(identifier, hd(Listeners)), - Message = list_to_binary(io_lib:format("Already Started: ~s", [Identifier])), + ID = maps:get(id, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Started: ~s", [ID])), {400, #{code => 'BAD_REQUEST', message => Message}}; _ -> case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of [] -> - Identifier = maps:get(identifier, hd(Listeners)), - Message = list_to_binary(io_lib:format("Already Stoped: ~s", [Identifier])), + ID = maps:get(id, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Stopped: ~s", [ID])), {400, #{code => 'BAD_REQUEST', message => Message}}; _ -> Reason = list_to_binary(io_lib:format("~p", [Errors])), @@ -299,9 +299,9 @@ manage_(Operation, Listeners) when is_list(Listeners) -> list_listener(Params) -> format(list_listener_(Params)). -list_listener_(#{node := Node, identifier := Identifier}) -> +list_listener_(#{node := Node, id := Identifier}) -> emqx_mgmt:get_listener(Node, Identifier); -list_listener_(#{identifier := Identifier}) -> +list_listener_(#{id := Identifier}) -> emqx_mgmt:list_listeners_by_id(Identifier); list_listener_(#{node := Node}) -> emqx_mgmt:list_listeners(Node); @@ -314,9 +314,9 @@ format(Listeners) when is_list(Listeners) -> format({error, Reason}) -> {error, Reason}; -format({Identifier, Conf}) -> +format({ID, Conf}) -> #{ - identifier => Identifier, + id => ID, node => maps:get(node, Conf), acceptors => maps:get(acceptors, Conf), max_conn => maps:get(max_connections, Conf), diff --git a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl index e3d50f57c..07697243e 100644 --- a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl @@ -49,36 +49,41 @@ t_list_node_listeners(_) -> t_get_listeners(_) -> LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - Identifier = maps:get(identifier, LocalListener), - Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + ID = maps:get(id, LocalListener), + Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(ID)]), get_api(Path). t_get_node_listeners(_) -> LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - Identifier = maps:get(identifier, LocalListener), + ID = maps:get(id, LocalListener), Path = emqx_mgmt_api_test_util:api_path( - ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(Identifier)]), + ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(ID)]), get_api(Path). -t_stop_listener(_) -> - LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), - Identifier = maps:get(identifier, LocalListener), - Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier), "stop"]), +t_manage_listener(_) -> + ID = "default:mqtt_tcp", + manage_listener(ID, "stop", false), + manage_listener(ID, "start", true), + manage_listener(ID, "restart", true). + +manage_listener(ID, Operation, Running) -> + Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Operation]), {ok, _} = emqx_mgmt_api_test_util:request_api(get, Path), - GetPath = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + timer:sleep(500), + GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]), {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath), Listeners = emqx_json:decode(ListenersResponse, [return_maps]), - [listener_stats(Listener, false) || Listener <- Listeners]. + [listener_stats(Listener, Running) || Listener <- Listeners]. get_api(Path) -> {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), case emqx_json:decode(ListenersData, [return_maps]) of [Listener] -> - Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), Filter = fun(Local) -> - maps:get(identifier, Local) =:= Identifier + maps:get(id, Local) =:= ID end, LocalListener = hd(lists:filter(Filter, LocalListeners)), comparison_listener(LocalListener, Listener); @@ -86,28 +91,28 @@ get_api(Path) -> ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), Fun = fun(LocalListener) -> - Identifier = maps:get(identifier, LocalListener), - IdentifierBinary = atom_to_binary(Identifier, utf8), + ID = maps:get(id, LocalListener), + IDBinary = atom_to_binary(ID, utf8), Filter = fun(Listener) -> - maps:get(<<"identifier">>, Listener) =:= IdentifierBinary + maps:get(<<"id">>, Listener) =:= IDBinary end, Listener = hd(lists:filter(Filter, Listeners)), comparison_listener(LocalListener, Listener) end, lists:foreach(Fun, LocalListeners); Listener when is_map(Listener) -> - Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + ID = binary_to_atom(maps:get(<<"id">>, Listener), utf8), Filter = fun(Local) -> - maps:get(identifier, Local) =:= Identifier + maps:get(id, Local) =:= ID end, LocalListener = hd(lists:filter(Filter, LocalListeners)), comparison_listener(LocalListener, Listener) end. comparison_listener(Local, Response) -> - ?assertEqual(maps:get(identifier, Local), binary_to_atom(maps:get(<<"identifier">>, Response))), + ?assertEqual(maps:get(id, Local), binary_to_atom(maps:get(<<"id">>, Response))), ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))), ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)), ?assertEqual(maps:get(max_conn, Local), maps:get(<<"max_conn">>, Response)), From 8e2b1aed322404461b420d64993b1b85cc67bc42 Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Mon, 2 Aug 2021 17:20:15 +0800 Subject: [PATCH 04/51] fix: publish api params --- .../src/emqx_mgmt_api_publish.erl | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 29e162b11..1e4555160 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -26,36 +26,32 @@ api_spec() -> { - [publish_api(), publish_batch_api()], + [publish_api(), publish_bulk_api()], [message_schema()] }. publish_api() -> + Schema = #{ + type => object, + properties => maps:without([id], message_properties()) + }, MeteData = #{ post => #{ description => <<"Publish">>, - 'requestBody' => #{ - content => #{ - 'application/json' => #{ - schema => #{ - type => object, - properties => maps:with([id], message_properties())}}}}, + 'requestBody' => emqx_mgmt_util:request_body_schema(Schema), responses => #{ <<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, message)}}}, {"/publish", MeteData, publish}. -publish_batch_api() -> +publish_bulk_api() -> + Schema = #{ + type => object, + properties => maps:without([id], message_properties()) + }, MeteData = #{ post => #{ description => <<"publish">>, - 'requestBody' => #{ - content => #{ - 'application/json' => #{ - schema => #{ - type => array, - items => #{ - type => object, - properties => maps:with([id], message_properties())}}}}}, + 'requestBody' => emqx_mgmt_util:request_body_array_schema(Schema), responses => #{ <<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, message)}}}, {"/publish/bulk", MeteData, publish_batch}. From 07a4d54a25df3f0f2eb51048871262dccabfed1f Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Mon, 2 Aug 2021 14:08:16 +0800 Subject: [PATCH 05/51] fix: unsubscribe api; query params --- apps/emqx_management/src/emqx_mgmt_api.erl | 8 +- .../src/emqx_mgmt_api_clients.erl | 89 ++++++++++--------- .../test/emqx_mgmt_clients_api_SUITE.erl | 8 +- 3 files changed, 57 insertions(+), 48 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index fbf926540..61cae83cc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -87,8 +87,8 @@ node_query(Node, Params, {Tab, QsSchema}, QueryFun) -> {_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1), Meta = #{page => Page, limit => Limit}, NMeta = case CodCnt =:= 0 of - true -> Meta#{count => count(Tab), hasnext => length(Rows) > Limit}; - _ -> Meta#{count => -1, hasnext => length(Rows) > Limit} + true -> Meta#{count => count(Tab)}; + _ -> Meta#{count => length(Rows)} end, #{meta => NMeta, data => lists:sublist(Rows, Limit)}. @@ -120,8 +120,8 @@ cluster_query(Params, {Tab, QsSchema}, QueryFun) -> Rows = do_cluster_query(Nodes, Qs, QueryFun, Start, Limit+1, []), Meta = #{page => Page, limit => Limit}, NMeta = case CodCnt =:= 0 of - true -> Meta#{count => count(Tab, Nodes), hasnext => length(Rows) > Limit}; - _ -> Meta#{count => -1, hasnext => length(Rows) > Limit} + true -> Meta#{count => count(Tab, Nodes)}; + _ -> Meta#{count => length(Rows)} end, #{meta => NMeta, data => lists:sublist(Rows, Limit)}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 97b6559f0..3c0379d2d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -32,6 +32,7 @@ , subscriptions/2 , authz_cache/2 , subscribe/2 + , unsubscribe/2 , subscribe_batch/2]). -export([ query/3 @@ -70,7 +71,8 @@ apis() -> , client_api() , clients_authz_cache_api() , clients_subscriptions_api() - , subscribe_api()]. + , subscribe_api() + , unsubscribe_api()]. schemas() -> Client = #{ @@ -211,17 +213,7 @@ schemas() -> } } }, - Subscription = #{ - subscription => #{ - type => object, - properties => #{ - topic => #{ - type => string}, - qos => #{ - type => integer, - enum => [0,1,2]}}} - }, - [Client, AuthzCache, Subscription]. + [Client, AuthzCache]. clients_api() -> Metadata = #{ @@ -370,6 +362,15 @@ clients_authz_cache_api() -> {"/clients/:clientid/authz_cache", Metadata, authz_cache}. clients_subscriptions_api() -> + SubscriptionSchema = #{ + type => object, + properties => #{ + topic => #{ + type => string}, + qos => #{ + type => integer, + enum => [0,1,2]}} + }, Metadata = #{ get => #{ description => <<"Get client subscriptions">>, @@ -380,10 +381,33 @@ clients_subscriptions_api() -> required => true }], responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}} + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}} }, {"/clients/:clientid/subscriptions", Metadata, subscriptions}. +unsubscribe_api() -> + Metadata = #{ + post => #{ + description => <<"Unsubscribe">>, + parameters => [ + #{ + name => clientid, + in => path, + schema => #{type => string}, + required => true + } + ], + 'requestBody' => emqx_mgmt_util:request_body_schema(#{ + type => object, + properties => #{ + topic => #{ + type => string, + description => <<"Topic">>}}}), + responses => #{ + <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), + <<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}}, + {"/clients/:clientid/unsubscribe", Metadata, unsubscribe}. subscribe_api() -> Metadata = #{ post => #{ @@ -407,32 +431,14 @@ subscribe_api() -> description => <<"QoS">>}}}), responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}, - delete => #{ - description => <<"Unsubscribe">>, - parameters => [ - #{ - name => clientid, - in => path, - schema => #{type => string}, - required => true - }, - #{ - name => topic, - in => query, - schema => #{type => string}, - required => true - } - ], - responses => #{ - <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}}, {"/clients/:clientid/subscribe", Metadata, subscribe}. %%%============================================================================================== %% parameters trans -clients(get, _Request) -> - list(#{}). +clients(get, Request) -> + Params = cowboy_req:parse_qs(Request), + list(Params). client(get, Request) -> ClientID = cowboy_req:binding(clientid, Request), @@ -456,11 +462,13 @@ subscribe(post, Request) -> TopicInfo = emqx_json:decode(Body, [return_maps]), Topic = maps:get(<<"topic">>, TopicInfo), Qos = maps:get(<<"qos">>, TopicInfo, 0), - subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}); + subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}). -subscribe(delete, Request) -> +unsubscribe(post, Request) -> ClientID = cowboy_req:binding(clientid, Request), - #{topic := Topic} = cowboy_req:match_qs([topic], Request), + {ok, Body, _} = cowboy_req:read_body(Request), + TopicInfo = emqx_json:decode(Body, [return_maps]), + Topic = maps:get(<<"topic">>, TopicInfo), unsubscribe(#{clientid => ClientID, topic => Topic}). %% TODO: batch @@ -488,7 +496,7 @@ subscriptions(get, Request) -> %% api apply list(Params) -> - Response = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun), + Response = emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun), {200, Response}. lookup(#{clientid := ClientID}) -> @@ -572,7 +580,7 @@ format_channel_info({_, ClientInfo, ClientStats}) -> ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3), RemoveList = [ - auth_result + auth_result , peername , sockname , peerhost @@ -581,6 +589,7 @@ format_channel_info({_, ClientInfo, ClientStats}) -> , conn_props , peercert , sockstate + , subscriptions , receive_maximum , protocol , is_superuser diff --git a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl index dbe2d83fa..9f754b4e1 100644 --- a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl @@ -94,15 +94,15 @@ t_clients(_) -> %% post /clients/:clientid/subscribe SubscribeBody = #{topic => Topic, qos => Qos}, - SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]), + SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]), {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody), timer:sleep(100), [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), - %% delete /clients/:clientid/subscribe - UnSubscribeQuery = "topic=" ++ binary_to_list(Topic), - {ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader), + %% post /clients/:clientid/unsubscribe + UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "unsubscribe"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, "", AuthHeader, SubscribeBody), timer:sleep(100), ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)). From 4bbd39855063feaa727ecd48067e9ef0a487d865 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 28 Jul 2021 11:05:49 +0200 Subject: [PATCH 06/51] refactor(emqx): create emqx_machine app --- apps/.gitkeep | 0 apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_app.erl | 78 +++++----------------- apps/emqx/src/emqx_schema.erl | 2 +- apps/emqx_machine/src/emqx_machine.app.src | 15 +++++ apps/emqx_machine/src/emqx_machine_app.erl | 77 +++++++++++++++++++++ apps/emqx_machine/src/emqx_machine_sup.erl | 34 ++++++++++ rebar.config.erl | 3 +- 8 files changed, 148 insertions(+), 63 deletions(-) delete mode 100644 apps/.gitkeep create mode 100644 apps/emqx_machine/src/emqx_machine.app.src create mode 100644 apps/emqx_machine/src/emqx_machine_app.erl create mode 100644 apps/emqx_machine/src/emqx_machine_sup.erl diff --git a/apps/.gitkeep b/apps/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 546b70f14..57f7cd57f 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -1,6 +1,6 @@ {application, emqx, [{id, "emqx"}, - {description, "EMQ X"}, + {description, "EMQ X Core"}, {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index a35c90747..47e886b9f 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -46,24 +46,15 @@ start(_Type, _Args) -> ok = maybe_load_config(), - ok = set_backtrace_depth(), - print_otp_version_warning(), - print_banner(), %% Load application first for ekka_mnesia scanner - _ = load_ce_modules(), ekka:start(), ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity), - false == os:getenv("EMQX_NO_QUIC") - andalso application:ensure_all_started(quicer), + ok = maybe_start_quicer(), {ok, Sup} = emqx_sup:start_link(), + ok = maybe_start_listeners(), ok = start_autocluster(), - % ok = emqx_plugins:init(), - _ = emqx_plugins:load(), - _ = start_ce_modules(), - emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), - register(emqx, self()), ok = emqx_alarm_handler:load(), - print_vsn(), + register(emqx, self()), {ok, Sup}. prep_stop(_State) -> @@ -89,52 +80,22 @@ maybe_load_config() -> emqx_config:init_load(emqx_schema, ConfFiles) end. -set_backtrace_depth() -> - Depth = emqx_config:get([node, backtrace_depth]), - _ = erlang:system_flag(backtrace_depth, Depth), - ok. +maybe_start_listeners() -> + case emqx_boot:is_enabled(listeners) of + true -> + ok = emqx_listeners:start(); + false -> + ok + end. --ifndef(EMQX_ENTERPRISE). -load_ce_modules() -> - application:load(emqx_modules). -start_ce_modules() -> - application:ensure_all_started(emqx_modules). --else. -load_ce_modules() -> - ok. -start_ce_modules() -> - ok. --endif. - -%%-------------------------------------------------------------------- -%% Print Banner -%%-------------------------------------------------------------------- - --if(?OTP_RELEASE> 22). -print_otp_version_warning() -> ok. --else. -print_otp_version_warning() -> - ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", - [?OTP_RELEASE]). --endif. % OTP_RELEASE - --ifndef(TEST). - -print_banner() -> - ?ULOG("Starting ~s on node ~s~n", [?APP, node()]). - -print_vsn() -> - ?ULOG("~s ~s is running now!~n", [get_description(), get_release()]). - --else. % TEST - -print_vsn() -> - ok. - -print_banner() -> - ok. - --endif. % TEST +maybe_start_quicer() -> + case os:getenv("EMQX_NO_QUIC") of + X when X =:= "1" orelse X =:= "true" -> + ok; + _ -> + {ok, _} = application:ensure_all_started(quicer), + ok + end. get_description() -> {ok, Descr0} = application:get_key(?APP, description), @@ -164,9 +125,6 @@ get_release() -> release_in_macro() -> element(2, ?EMQX_RELEASE). -%%-------------------------------------------------------------------- -%% Autocluster -%%-------------------------------------------------------------------- start_autocluster() -> ekka:callback(prepare, fun emqx:shutdown/1), ekka:callback(reboot, fun emqx:reboot/0), diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3fd060d9f..a35dfefc2 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -160,7 +160,7 @@ fields("node") -> , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")} , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} - , {"backtrace_depth", t(integer(), undefined, 23)} + , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} ]; fields("rpc") -> diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src new file mode 100644 index 000000000..21da4a4c8 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -0,0 +1,15 @@ +{application, emqx_machine, + [{id, "emqx_machine"}, + {description, "The EMQ X Machine"}, + {vsn, "0.1.0"}, % strict semver, bump manually! + {modules, []}, + {registered, []}, + {applications, [kernel,stdlib]}, + {mod, {emqx_machine_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl new file mode 100644 index 000000000..f86bb57e8 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_app). + +-export([ start/2 + , stop/1 + , prep_stop/1 + ]). + +-behaviour(application). + +-include_lib("emqx/include/logger.hrl"). + +start(_Type, _Args) -> + ok = set_backtrace_depth(), + ok = print_otp_version_warning(), + _ = load_modules(), + + {ok, _} = application:ensure_all_started(emqx), + + _ = emqx_plugins:load(), + _ = start_modules(), + + ok = print_vsn(), + emqx_machine_sup:start_link(). + +prep_stop(_State) -> + application:stop(emqx). + +stop(_State) -> + ok. + +set_backtrace_depth() -> + {ok, Depth} = application:get_env(emqx_machine, backtrace_depth), + _ = erlang:system_flag(backtrace_depth, Depth), + ok. + +-if(?OTP_RELEASE > 22). +print_otp_version_warning() -> ok. +-else. +print_otp_version_warning() -> + ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", + [?OTP_RELEASE]). +-endif. % OTP_RELEASE > 22 + +-ifdef(TEST). +print_vsn() -> ok. +-else. % TEST +print_vsn() -> + ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). +-endif. % TEST + +-ifndef(EMQX_ENTERPRISE). +load_modules() -> + application:load(emqx_modules). +start_modules() -> + application:ensure_all_started(emqx_modules). +-else. +load_modules() -> + ok. +start_modules() -> + ok. +-endif. diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl new file mode 100644 index 000000000..367e6d13e --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_sup). + +-behaviour(supervisor). + +-export([ start_link/0 + ]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, []}}. diff --git a/rebar.config.erl b/rebar.config.erl index ac84ead5a..d8a6e7ea2 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -264,7 +264,8 @@ relx_apps(ReleaseType) -> , inets , compiler , runtime_tools - , emqx + , {emqx, load} % started by emqx_machine + , emqx_machine , {mnesia, load} , {ekka, load} , {emqx_plugin_libs, load} From 522d8e0a4aee397395523e02e966c7721acd3ae9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 28 Jul 2021 15:19:52 +0200 Subject: [PATCH 07/51] refactor(config): split config and schema --- apps/emqx/etc/emqx.conf | 699 ------------------ apps/emqx/src/emqx_schema.erl | 328 +------- apps/emqx_machine/etc/emqx_machine.conf | 696 +++++++++++++++++ apps/emqx_machine/src/emqx_machine_app.erl | 11 + apps/emqx_machine/src/emqx_machine_schema.erl | 421 +++++++++++ bin/emqx | 9 +- extension_schemas.config | 22 - rebar.config.erl | 7 +- scripts/merge-config.escript | 6 +- 9 files changed, 1142 insertions(+), 1057 deletions(-) create mode 100644 apps/emqx_machine/etc/emqx_machine.conf create mode 100644 apps/emqx_machine/src/emqx_machine_schema.erl delete mode 100644 extension_schemas.config diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 46f81a87d..5200f5239 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,702 +1,3 @@ -## master-88df1713 - -## NOTE: The configurations in this file will be overridden by -## `/data/emqx_overrides.conf` - -##================================================================== -## Node -##================================================================== -node { - ## Node name. - ## See: http://erlang.org/doc/reference_manual/distributed.html - ## - ## @doc node.name - ## ValueType: NodeName - ## Default: emqx@127.0.0.1 - name: "emqx@127.0.0.1" - - ## Cookie for distributed node communication. - ## - ## @doc node.cookie - ## ValueType: String - ## Default: emqxsecretcookie - cookie: emqxsecretcookie - - ## Data dir for the node - ## - ## @doc node.data_dir - ## ValueType: Folder - ## Default: "{{ platform_data_dir }}/" - data_dir: "{{ platform_data_dir }}/" - - ## Dir of crash dump file. - ## - ## @doc node.crash_dump_dir - ## ValueType: Folder - ## Default: "{{ platform_log_dir }}/" - crash_dump_dir: "{{ platform_log_dir }}/" - - ## Global GC Interval. - ## - ## @doc node.global_gc_interval - ## ValueType: Duration - ## Default: 15m - global_gc_interval: 15m - - ## Sets the net_kernel tick time in seconds. - ## Notice that all communicating nodes are to have the same - ## TickTime value specified. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime - ## - ## @doc node.dist_net_ticktime - ## ValueType: Number - ## Default: 2m - dist_net_ticktime: 2m - - ## Sets the port range for the listener socket of a distributed - ## Erlang node. - ## Note that if there are firewalls between clustered nodes, this - ## port segment for nodes’ communication should be allowed. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html - ## - ## @doc node.dist_listen_min - ## ValueType: Integer - ## Range: [1024,65535] - ## Default: 6369 - dist_listen_min: 6369 - - ## Sets the port range for the listener socket of a distributed - ## Erlang node. - ## Note that if there are firewalls between clustered nodes, this - ## port segment for nodes’ communication should be allowed. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html - ## - ## @doc node.dist_listen_max - ## ValueType: Integer - ## Range: [1024,65535] - ## Default: 6369 - dist_listen_max: 6369 - - ## Sets the maximum depth of call stack back-traces in the exit - ## reason element of 'EXIT' tuples. - ## The flag also limits the stacktrace depth returned by - ## process_info item current_stacktrace. - ## - ## @doc node.backtrace_depth - ## ValueType: Integer - ## Range: [0,1024] - ## Default: 23 - backtrace_depth: 23 - -} - -##================================================================== -## Cluster -##================================================================== -cluster { - ## Cluster name. - ## - ## @doc cluster.name - ## ValueType: String - ## Default: emqxcl - name: emqxcl - - ## Enable cluster autoheal from network partition. - ## - ## @doc cluster.autoheal - ## ValueType: Boolean - ## Default: true - autoheal: true - - ## Autoclean down node. A down node will be removed from the cluster - ## if this value > 0. - ## - ## @doc cluster.autoclean - ## ValueType: Duration - ## Default: 5m - autoclean: 5m - - ## Node discovery strategy to join the cluster. - ## - ## @doc cluster.discovery_strategy - ## ValueType: manual | static | mcast | dns | etcd | k8s - ## - manual: Manual join command - ## - static: Static node list - ## - mcast: IP Multicast - ## - dns: DNS A Record - ## - etcd: etcd - ## - k8s: Kubernetes - ## - ## Default: manual - discovery_strategy: manual - - ##---------------------------------------------------------------- - ## Cluster using static node list - ##---------------------------------------------------------------- - static { - ## Node list of the cluster - ## - ## @doc cluster.static.seeds - ## ValueType: Array - ## Default: [] - seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"] - } - - ##---------------------------------------------------------------- - ## Cluster using IP Multicast - ##---------------------------------------------------------------- - mcast { - ## IP Multicast Address. - ## - ## @doc cluster.mcast.addr - ## ValueType: IPAddress - ## Default: "239.192.0.1" - addr: "239.192.0.1" - - ## Multicast Ports. - ## - ## @doc cluster.mcast.ports - ## ValueType: Array - ## Default: [4369, 4370] - ports: [4369, 4370] - - ## Multicast Iface. - ## - ## @doc cluster.mcast.iface - ## ValueType: IPAddress - ## Default: "0.0.0.0" - iface: "0.0.0.0" - - ## Multicast Ttl. - ## - ## @doc cluster.mcast.ttl - ## ValueType: Integer - ## Range: [0,255] - ## Default: 255 - ttl: 255 - - ## Multicast loop. - ## - ## @doc cluster.mcast.loop - ## ValueType: Boolean - ## Default: true - loop: true - } - - ##---------------------------------------------------------------- - ## Cluster using DNS A records - ##---------------------------------------------------------------- - dns { - ## DNS name. - ## - ## @doc cluster.dns.name - ## ValueType: String - ## Default: localhost - name: localhost - - ## The App name is used to build 'node.name' with IP address. - ## - ## @doc cluster.dns.app - ## ValueType: String - ## Default: emqx - app: emqx - } - - ##---------------------------------------------------------------- - ## Cluster using etcd - ##---------------------------------------------------------------- - etcd { - ## Etcd server list, seperated by ','. - ## - ## @doc cluster.etcd.server - ## ValueType: URL - ## Required: true - server: "http://127.0.0.1:2379" - - ## The prefix helps build nodes path in etcd. Each node in the cluster - ## will create a path in etcd: v2/keys/// - ## - ## @doc cluster.etcd.prefix - ## ValueType: String - ## Default: emqxcl - prefix: emqxcl - - ## The TTL for node's path in etcd. - ## - ## @doc cluster.etcd.node_ttl - ## ValueType: Duration - ## Default: 1m - node_ttl: 1m - - ## Path to the file containing the user's private PEM-encoded key. - ## - ## @doc cluster.etcd.ssl.keyfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/key.pem" - ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem" - - ## Path to a file containing the user certificate. - ## - ## @doc cluster.etcd.ssl.certfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/cert.pem" - ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem" - - ## Path to the file containing PEM-encoded CA certificates. The CA certificates - ## are used during server authentication and when building the client certificate chain. - ## - ## @doc cluster.etcd.ssl.cacertfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/cacert.pem" - ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" - } - - ##---------------------------------------------------------------- - ## Cluster using Kubernetes - ##---------------------------------------------------------------- - k8s { - ## Kubernetes API server list, seperated by ','. - ## - ## @doc cluster.k8s.apiserver - ## ValueType: URL - ## Required: true - apiserver: "http://10.110.111.204:8080" - - ## The service name helps lookup EMQ nodes in the cluster. - ## - ## @doc cluster.k8s.service_name - ## ValueType: String - ## Default: emqx - service_name: emqx - - ## The address type is used to extract host from k8s service. - ## - ## @doc cluster.k8s.address_type - ## ValueType: ip | dns | hostname - ## Default: ip - address_type: ip - - ## The app name helps build 'node.name'. - ## - ## @doc cluster.k8s.app_name - ## ValueType: String - ## Default: emqx - app_name: emqx - - ## The suffix added to dns and hostname get from k8s service - ## - ## @doc cluster.k8s.suffix - ## ValueType: String - ## Default: "pod.local" - suffix: "pod.local" - - ## Kubernetes Namespace - ## - ## @doc cluster.k8s.namespace - ## ValueType: String - ## Default: default - namespace: default - } - - db_backend: mnesia - - rlog: { - # role: core - # core_nodes: [] - } - -} - -##================================================================== -## Log -##================================================================== -log { - ## The primary log level - ## - ## - all the log messages with levels lower than this level will - ## be dropped. - ## - all the log messages with levels higher than this level will - ## go into the log handlers. The handlers then decide to log it - ## out or drop it according to the level setting of the handler. - ## - ## Note: Only the messages with severity level higher than or - ## equal to this level will be logged. - ## - ## @doc log.primary_level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - primary_level: warning - - ##---------------------------------------------------------------- - ## The console log handler send log messages to emqx console - ##---------------------------------------------------------------- - ## Log to single line - ## @doc log.console_handler.enable - ## ValueType: Boolean - ## Default: false - console_handler.enable: false - - ## The log level of this handler - ## All the log messages with levels lower than this level will - ## be dropped. - ## - ## @doc log.console_handler.level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - console_handler.level: warning - - ##---------------------------------------------------------------- - ## The file log handlers send log messages to files - ##---------------------------------------------------------------- - ## file_handlers. - file_handlers.emqx_log: { - ## The log level filter of this handler - ## All the log messages with levels lower than this level will - ## be dropped. - ## - ## @doc log.file_handlers..level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - level: warning - - ## The log file for specified level. - ## - ## If `rotation` is disabled, this is the file of the log files. - ## - ## If `rotation` is enabled, this is the base name of the files. - ## Each file in a rotated log is named .N, where N is an integer. - ## - ## Note: Log files for a specific log level will only contain all the logs - ## that higher than or equal to that level - ## - ## @doc log.file_handlers..file - ## ValueType: File - ## Required: true - file: "{{ platform_log_dir }}/emqx.log" - - ## Enables the log rotation. - ## With this enabled, new log files will be created when the current - ## log file is full, max to `rotation_count` files will be created. - ## - ## @doc log.file_handlers..rotation.enable - ## ValueType: Boolean - ## Default: true - rotation.enable: true - - ## Maximum rotation count of log files. - ## - ## @doc log.file_handlers..rotation.count - ## ValueType: Integer - ## Range: [1, 2048] - ## Default: 10 - rotation.count: 10 - - ## Maximum size of each log file. - ## - ## If the max_size reached and `rotation` is disabled, the handler - ## will stop sending log messages, if the `rotation` is enabled, - ## the file rotates. - ## - ## @doc log.file_handlers..max_size - ## ValueType: Size | infinity - ## Default: 10MB - max_size: 10MB - } - - ## file_handlers. - ## - ## You could also create multiple file handlers for different - ## log level for example: - file_handlers.emqx_error_log: { - level: error - file: "{{ platform_log_dir }}/error.log" - } - - ## Timezone offset to display in logs - ## - ## @doc log.time_offset - ## ValueType: system | utc | String - ## - "system" use system zone - ## - "utc" for Universal Coordinated Time (UTC) - ## - "+hh:mm" or "-hh:mm" for a specified offset - ## Default: system - time_offset: system - - ## Limits the total number of characters printed for each log event. - ## - ## @doc log.chars_limit - ## ValueType: Integer | infinity - ## Range: [0, infinity) - ## Default: infinity - chars_limit: infinity - - ## Maximum depth for Erlang term log formatting - ## and Erlang process message queue inspection. - ## - ## @doc log.max_depth - ## ValueType: Integer | infinity - ## Default: 80 - max_depth: 80 - - ## Log formatter - ## @doc log.formatter - ## ValueType: text | json - ## Default: text - formatter: text - - ## Log to single line - ## @doc log.single_line - ## ValueType: Boolean - ## Default: true - single_line: true - - ## The max allowed queue length before switching to sync mode. - ## - ## Log overload protection parameter. If the message queue grows - ## larger than this value the handler switches from anync to sync mode. - ## - ## @doc log.sync_mode_qlen - ## ValueType: Integer - ## Range: [0, ${log.drop_mode_qlen}] - ## Default: 100 - sync_mode_qlen: 100 - - ## The max allowed queue length before switching to drop mode. - ## - ## Log overload protection parameter. When the message queue grows - ## larger than this threshold, the handler switches to a mode in which - ## it drops all new events that senders want to log. - ## - ## @doc log.drop_mode_qlen - ## ValueType: Integer - ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}] - ## Default: 3000 - drop_mode_qlen: 3000 - - ## The max allowed queue length before switching to flush mode. - ## - ## Log overload protection parameter. If the length of the message queue - ## grows larger than this threshold, a flush (delete) operation takes place. - ## To flush events, the handler discards the messages in the message queue - ## by receiving them in a loop without logging. - ## - ## @doc log.flush_qlen - ## ValueType: Integer - ## Range: [${log.drop_mode_qlen}, infinity) - ## Default: 8000 - flush_qlen: 8000 - - ## Kill the log handler when it gets overloaded. - ## - ## Log overload protection parameter. It is possible that a handler, - ## even if it can successfully manage peaks of high load without crashing, - ## can build up a large message queue, or use a large amount of memory. - ## We could kill the log handler in these cases and restart it after a - ## few seconds. - ## - ## @doc log.overload_kill.enable - ## ValueType: Boolean - ## Default: true - overload_kill.enable: true - - ## The max allowed queue length before killing the log hanlder. - ## - ## Log overload protection parameter. This is the maximum allowed queue - ## length. If the message queue grows larger than this, the handler - ## process is terminated. - ## - ## @doc log.overload_kill.qlen - ## ValueType: Integer - ## Range: [0, 1048576] - ## Default: 20000 - overload_kill.qlen: 20000 - - ## The max allowed memory size before killing the log hanlder. - ## - ## Log overload protection parameter. This is the maximum memory size - ## that the handler process is allowed to use. If the handler grows - ## larger than this, the process is terminated. - ## - ## @doc log.overload_kill.mem_size - ## ValueType: Size - ## Default: 30MB - overload_kill.mem_size: 30MB - - ## Restart the log hanlder after some seconds. - ## - ## Log overload protection parameter. If the handler is terminated, - ## it restarts automatically after a delay specified in seconds. - ## - ## @doc log.overload_kill.restart_after - ## ValueType: Duration - ## Default: 5s - overload_kill.restart_after: 5s - - ## Controlling Bursts of Log Requests. - ## - ## Log overload protection parameter. Large bursts of log events - many - ## events received by the handler under a short period of time - can - ## potentially cause problems. By specifying the maximum number of events - ## to be handled within a certain time frame, the handler can avoid - ## choking the log with massive amounts of printouts. - ## - ## Note that there would be no warning if any messages were - ## dropped because of burst control. - ## - ## @doc log.burst_limit.enable - ## ValueType: Boolean - ## Default: false - burst_limit.enable: false - - ## This config controls the maximum number of events to handle within - ## a time frame. After the limit is reached, successive events are - ## dropped until the end of the time frame defined by `window_time`. - ## - ## @doc log.burst_limit.max_count - ## ValueType: Integer - ## Default: 10000 - burst_limit.max_count: 10000 - - ## See the previous description of burst_limit_max_count. - ## - ## @doc log.burst_limit.window_time - ## ValueType: duration - ## Default: 1s - burst_limit.window_time: 1s -} - -##================================================================== -## RPC -##================================================================== -rpc { - ## RPC Mode. - ## - ## @doc rpc.mode - ## ValueType: sync | async - ## Default: async - mode: async - - ## Max batch size of async RPC requests. - ## - ## NOTE: RPC batch won't work when rpc.mode = sync - ## Zero value disables rpc batching. - ## - ## @doc rpc.async_batch_size - ## ValueType: Integer - ## Range: [0, 1048576] - ## Default: 0 - async_batch_size: 256 - - ## RPC port discovery - ## - ## The strategy for discovering the RPC listening port of - ## other nodes. - ## - ## @doc cluster.discovery_strategy - ## ValueType: manual | stateless - ## - manual: discover ports by `tcp_server_port`. - ## - stateless: discover ports in a stateless manner. - ## If node name is `emqx@127.0.0.1`, where the `` is - ## an integer, then the listening port will be `5370 + ` - ## - ## Default: `stateless`. - port_discovery: stateless - - ## TCP server port for RPC. - ## - ## Only takes effect when `rpc.port_discovery` = `manual`. - ## - ## @doc rpc.tcp_server_port - ## ValueType: Integer - ## Range: [1024-65535] - ## Defaults: 5369 - tcp_server_port: 5369 - - ## Number of outgoing RPC connections. - ## - ## Set this to 1 to keep the message order sent from the same - ## client. - ## - ## @doc rpc.tcp_client_num - ## ValueType: Integer - ## Range: [1, 256] - ## Defaults: 1 - tcp_client_num: 1 - - ## RCP Client connect timeout. - ## - ## @doc rpc.connect_timeout - ## ValueType: Duration - ## Default: 5s - connect_timeout: 5s - - ## TCP send timeout of RPC client and server. - ## - ## @doc rpc.send_timeout - ## ValueType: Duration - ## Default: 5s - send_timeout: 5s - - ## Authentication timeout - ## - ## @doc rpc.authentication_timeout - ## ValueType: Duration - ## Default: 5s - authentication_timeout: 5s - - ## Default receive timeout for call() functions - ## - ## @doc rpc.call_receive_timeout - ## ValueType: Duration - ## Default: 15s - call_receive_timeout: 15s - - ## Socket idle keepalive. - ## - ## @doc rpc.socket_keepalive_idle - ## ValueType: Duration - ## Default: 900s - socket_keepalive_idle: 900s - - ## TCP Keepalive probes interval. - ## - ## @doc rpc.socket_keepalive_interval - ## ValueType: Duration - ## Default: 75s - socket_keepalive_interval: 75s - - ## Probes lost to close the connection - ## - ## @doc rpc.socket_keepalive_count - ## ValueType: Integer - ## Default: 9 - socket_keepalive_count: 9 - - ## Size of TCP send buffer. - ## - ## @doc rpc.socket_sndbuf - ## ValueType: Size - ## Default: 1MB - socket_sndbuf: 1MB - - ## Size of TCP receive buffer. - ## - ## @doc rpc.socket_recbuf - ## ValueType: Size - ## Default: 1MB - socket_recbuf: 1MB - - ## Size of user-level software socket buffer. - ## - ## @doc rpc.socket_buffer - ## ValueType: Size - ## Default: 1MB - socket_buffer: 1MB -} - ##================================================================== ## Broker ##================================================================== diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index a35dfefc2..4e0e8f85e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -24,7 +24,6 @@ -include_lib("typerefl/include/types.hrl"). --type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. -type duration() :: integer(). -type duration_s() :: integer(). -type duration_ms() :: integer(). @@ -60,179 +59,18 @@ -behaviour(hocon_schema). --reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0, +-reflect_type([ duration/0, duration_s/0, duration_ms/0, bytesize/0, wordsize/0, percent/0, file/0, comma_separated_list/0, bar_separated_list/0, ip_port/0, cipher/0, comma_separated_atoms/0]). --export([structs/0, fields/1, translations/0, translation/1]). +-export([structs/0, fields/1]). -export([t/1, t/3, t/4, ref/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ssl/1]). -%% will be used by emqx_ct_helper to find the dependent apps --export([includes/0, extra_schema_fields/1]). - -structs() -> ["cluster", "node", "rpc", "log", - "zones", "listeners", "broker", - "plugins", "sysmon", "alarm"] - ++ ?MODULE:includes(). - --ifndef(EMQX_EXT_SCHEMAS). -includes() -> []. --else. -includes() -> - [FieldName || {FieldName, _SchemaMod} <- ?EMQX_EXT_SCHEMAS]. --endif. - -fields("cluster") -> - [ {"name", t(atom(), "ekka.cluster_name", emqxcl)} - , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]), - undefined, manual)} - , {"autoclean", t(duration(), "ekka.cluster_autoclean", "5m")} - , {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)} - , {"static", ref("static")} - , {"mcast", ref("mcast")} - , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} - , {"dns", ref("dns")} - , {"etcd", ref("etcd")} - , {"k8s", ref("k8s")} - , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)} - , {"rlog", ref("rlog")} - ]; - -fields("static") -> - [ {"seeds", t(hoconsc:array(string()), undefined, [])}]; - -fields("mcast") -> - [ {"addr", t(string(), undefined, "239.192.0.1")} - , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])} - , {"iface", t(string(), undefined, "0.0.0.0")} - , {"ttl", t(range(0, 255), undefined, 255)} - , {"loop", t(boolean(), undefined, true)} - , {"sndbuf", t(bytesize(), undefined, "16KB")} - , {"recbuf", t(bytesize(), undefined, "16KB")} - , {"buffer", t(bytesize(), undefined, "32KB")} - ]; - -fields("dns") -> - [ {"name", t(string(), undefined, "localhost")} - , {"app", t(string(), undefined, "emqx")}]; - -fields("etcd") -> - [ {"server", t(comma_separated_list())} - , {"prefix", t(string(), undefined, "emqxcl")} - , {"node_ttl", t(duration(), undefined, "1m")} - , {"ssl", ref("etcd_ssl")} - ]; - -fields("etcd_ssl") -> - ssl(#{}); - -fields("k8s") -> - [ {"apiserver", t(string())} - , {"service_name", t(string(), undefined, "emqx")} - , {"address_type", t(union([ip, dns, hostname]))} - , {"app_name", t(string(), undefined, "emqx")} - , {"namespace", t(string(), undefined, "default")} - , {"suffix", t(string(), undefined, "pod.local")} - ]; - -fields("rlog") -> - [ {"role", t(union([core, replicant]), "ekka.node_role", core)} - , {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])} - ]; - -fields("node") -> - [ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1", - override_env => "EMQX_NODE_NAME" - })} - , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie", - default => "emqxsecretcookie", - sensitive => true, - override_env => "EMQX_NODE_COOKIE" - })} - , {"data_dir", t(string())} - , {"config_files", t(comma_separated_list())} - , {"global_gc_interval", t(duration(), undefined, "15m")} - , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} - , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")} - , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} - , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} - , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} - ]; - -fields("rpc") -> - [ {"mode", t(union(sync, async), undefined, async)} - , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)} - , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)} - , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)} - , {"tcp_client_num", t(range(1, 256), undefined, 1)} - , {"connect_timeout", t(duration(), "gen_rpc.connect_timeout", "5s")} - , {"send_timeout", t(duration(), "gen_rpc.send_timeout", "5s")} - , {"authentication_timeout", t(duration(), "gen_rpc.authentication_timeout", "5s")} - , {"call_receive_timeout", t(duration(), "gen_rpc.call_receive_timeout", "15s")} - , {"socket_keepalive_idle", t(duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")} - , {"socket_keepalive_interval", t(duration_s(), "gen_rpc.socket_keepalive_interval", "75s")} - , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)} - , {"socket_sndbuf", t(bytesize(), "gen_rpc.socket_sndbuf", "1MB")} - , {"socket_recbuf", t(bytesize(), "gen_rpc.socket_recbuf", "1MB")} - , {"socket_buffer", t(bytesize(), "gen_rpc.socket_buffer", "1MB")} - ]; - -fields("log") -> - [ {"primary_level", t(log_level(), undefined, warning)} - , {"console_handler", ref("console_handler")} - , {"file_handlers", ref("file_handlers")} - , {"time_offset", t(string(), undefined, "system")} - , {"chars_limit", maybe_infinity(range(1, inf))} - , {"supervisor_reports", t(union([error, progress]), undefined, error)} - , {"max_depth", t(union([infinity, integer()]), - "kernel.error_logger_format_depth", 80)} - , {"formatter", t(union([text, json]), undefined, text)} - , {"single_line", t(boolean(), undefined, true)} - , {"sync_mode_qlen", t(integer(), undefined, 100)} - , {"drop_mode_qlen", t(integer(), undefined, 3000)} - , {"flush_qlen", t(integer(), undefined, 8000)} - , {"overload_kill", ref("log_overload_kill")} - , {"burst_limit", ref("log_burst_limit")} - , {"error_logger", t(atom(), "kernel.error_logger", silent)} - ]; - -fields("console_handler") -> - [ {"enable", t(boolean(), undefined, false)} - , {"level", t(log_level(), undefined, warning)} - ]; - -fields("file_handlers") -> - [ {"$name", ref("log_file_handler")} - ]; - -fields("log_file_handler") -> - [ {"level", t(log_level(), undefined, warning)} - , {"file", t(file(), undefined, undefined)} - , {"rotation", ref("log_rotation")} - , {"max_size", maybe_infinity(bytesize(), "10MB")} - ]; - -fields("log_rotation") -> - [ {"enable", t(boolean(), undefined, true)} - , {"count", t(range(1, 2048), undefined, 10)} - ]; - -fields("log_overload_kill") -> - [ {"enable", t(boolean(), undefined, true)} - , {"mem_size", t(bytesize(), undefined, "30MB")} - , {"qlen", t(integer(), undefined, 20000)} - , {"restart_after", t(union(duration(), infinity), undefined, "5s")} - ]; - -fields("log_burst_limit") -> - [ {"enable", t(boolean(), undefined, true)} - , {"max_count", t(integer(), undefined, 10000)} - , {"window_time", t(duration(), undefined, "1s")} - ]; +structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm"]. fields("stats") -> [ {"enable", t(boolean(), undefined, true)} @@ -480,20 +318,7 @@ fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} , {"validity_period", t(duration(), undefined, "24h")} - ]; - -fields(FieldName) -> - ?MODULE:extra_schema_fields(FieldName). - --ifndef(EMQX_EXT_SCHEMAS). -%% Function extra_schema_fields/1 only terminates with explicit exception --dialyzer([{nowarn_function, [extra_schema_fields/1]}]). -extra_schema_fields(FieldName) -> error({unknown_field, FieldName}). --else. -extra_schema_fields(FieldName) -> - {_, Mod} = lists:keyfind(FieldName, 1, ?EMQX_EXT_SCHEMAS), - Mod:fields(FieldName). --endif. + ]. mqtt_listener() -> base_listener() ++ @@ -509,117 +334,6 @@ base_listener() -> , {"rate_limit", ref("rate_limit")} ]. -translations() -> ["ekka", "kernel", "emqx"]. - -translation("ekka") -> - [ {"cluster_discovery", fun tr_cluster__discovery/1}]; - -translation("kernel") -> - [ {"logger_level", fun tr_logger_level/1} - , {"logger", fun tr_logger/1}]; - -translation("emqx") -> - [ {"config_files", fun tr_config_files/1} - ]. - -tr_config_files(Conf) -> - case conf_get("emqx.config_files", Conf) of - [_ | _] = Files -> - Files; - _ -> - case os:getenv("RUNNER_ETC_DIR") of - false -> - [filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])]; - Dir -> - [filename:join([Dir, "emqx.conf"])] - end - end. - -tr_cluster__discovery(Conf) -> - Strategy = conf_get("cluster.discovery_strategy", Conf), - {Strategy, filter(options(Strategy, Conf))}. - -tr_logger_level(Conf) -> conf_get("log.primary_level", Conf). - -tr_logger(Conf) -> - CharsLimit = case conf_get("log.chars_limit", Conf) of - infinity -> unlimited; - V -> V - end, - SingleLine = conf_get("log.single_line", Conf), - FmtName = conf_get("log.formatter", Conf), - Formatter = formatter(FmtName, CharsLimit, SingleLine), - BasicConf = #{ - sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf), - drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf), - flush_qlen => conf_get("log.flush_qlen", Conf), - overload_kill_enable => conf_get("log.overload_kill.enable", Conf), - overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf), - overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf), - overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf), - burst_limit_enable => conf_get("log.burst_limit.enable", Conf), - burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf), - burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf) - }, - Filters = case conf_get("log.supervisor_reports", Conf) of - error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]; - progress -> [] - end, - %% For the default logger that outputs to console - ConsoleHandler = - case conf_get("log.console_handler.enable", Conf) of - true -> - [{handler, console, logger_std_h, #{ - level => conf_get("log.console_handler.level", Conf), - config => BasicConf#{type => standard_io}, - formatter => Formatter, - filters => Filters - }}]; - false -> [] - end, - %% For the file logger - FileHandlers = - [{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{ - level => conf_get("level", SubConf), - config => BasicConf#{ - type => case conf_get("rotation.enable", SubConf) of - true -> wrap; - _ -> halt - end, - file => conf_get("file", SubConf), - max_no_files => conf_get("rotation.count", SubConf), - max_no_bytes => conf_get("max_size", SubConf) - }, - formatter => Formatter, - filters => Filters, - filesync_repeat_interval => no_repeat - }} - || {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf, #{}))], - - [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers. - -%% helpers -formatter(json, CharsLimit, SingleLine) -> - {emqx_logger_jsonfmt, - #{chars_limit => CharsLimit, - single_line => SingleLine - }}; -formatter(text, CharsLimit, SingleLine) -> - {emqx_logger_textfmt, - #{template => - [time," [",level,"] ", - {clientid, - [{peername, - [clientid,"@",peername," "], - [clientid, " "]}], - [{peername, - [peername," "], - []}]}, - msg,"\n"], - chars_limit => CharsLimit, - single_line => SingleLine - }}. - %% utils -spec(conf_get(string() | [string()], hocon:config()) -> term()). conf_get(Key, Conf) -> @@ -740,8 +454,7 @@ t(Type, Mapping, Default, OverrideEnv, Validator) -> , validator => Validator }). -ref(Field) -> - fun (type) -> Field; (_) -> undefined end. +ref(Field) -> hoconsc:t(hoconsc:ref(?MODULE, Field)). maybe_disabled(T) -> maybe_sth(disabled, T, disabled). @@ -817,37 +530,6 @@ to_erl_cipher_suite(Str) -> Cipher -> Cipher end. -options(static, Conf) -> - [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}]; -options(mcast, Conf) -> - {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)), - {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)), - Ports = conf_get("cluster.mcast.ports", Conf), - [{addr, Addr}, {ports, Ports}, {iface, Iface}, - {ttl, conf_get("cluster.mcast.ttl", Conf, 1)}, - {loop, conf_get("cluster.mcast.loop", Conf, true)}]; -options(dns, Conf) -> - [{name, conf_get("cluster.dns.name", Conf)}, - {app, conf_get("cluster.dns.app", Conf)}]; -options(etcd, Conf) -> - Namespace = "cluster.etcd.ssl", - SslOpts = fun(C) -> - Options = keys(Namespace, C), - lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end, - [{server, conf_get("cluster.etcd.server", Conf)}, - {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")}, - {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)}, - {ssl_options, filter(SslOpts(Conf))}]; -options(k8s, Conf) -> - [{apiserver, conf_get("cluster.k8s.apiserver", Conf)}, - {service_name, conf_get("cluster.k8s.service_name", Conf)}, - {address_type, conf_get("cluster.k8s.address_type", Conf, ip)}, - {app_name, conf_get("cluster.k8s.app_name", Conf)}, - {namespace, conf_get("cluster.k8s.namespace", Conf)}, - {suffix, conf_get("cluster.k8s.suffix", Conf, "")}]; -options(manual, _Conf) -> - []. - to_atom(Atom) when is_atom(Atom) -> Atom; to_atom(Str) when is_list(Str) -> diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf new file mode 100644 index 000000000..0797a9d70 --- /dev/null +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -0,0 +1,696 @@ +## NOTE: The configurations in this file will be overridden by +## `/data/emqx_overrides.conf` + +##================================================================== +## Node +##================================================================== +node { + ## Node name. + ## See: http://erlang.org/doc/reference_manual/distributed.html + ## + ## @doc node.name + ## ValueType: NodeName + ## Default: emqx@127.0.0.1 + name: "emqx@127.0.0.1" + + ## Cookie for distributed node communication. + ## + ## @doc node.cookie + ## ValueType: String + ## Default: emqxsecretcookie + cookie: emqxsecretcookie + + ## Data dir for the node + ## + ## @doc node.data_dir + ## ValueType: Folder + ## Default: "{{ platform_data_dir }}/" + data_dir: "{{ platform_data_dir }}/" + + ## Dir of crash dump file. + ## + ## @doc node.crash_dump_dir + ## ValueType: Folder + ## Default: "{{ platform_log_dir }}/" + crash_dump_dir: "{{ platform_log_dir }}/" + + ## Global GC Interval. + ## + ## @doc node.global_gc_interval + ## ValueType: Duration + ## Default: 15m + global_gc_interval: 15m + + ## Sets the net_kernel tick time in seconds. + ## Notice that all communicating nodes are to have the same + ## TickTime value specified. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime + ## + ## @doc node.dist_net_ticktime + ## ValueType: Number + ## Default: 2m + dist_net_ticktime: 2m + + ## Sets the port range for the listener socket of a distributed + ## Erlang node. + ## Note that if there are firewalls between clustered nodes, this + ## port segment for nodes’ communication should be allowed. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html + ## + ## @doc node.dist_listen_min + ## ValueType: Integer + ## Range: [1024,65535] + ## Default: 6369 + dist_listen_min: 6369 + + ## Sets the port range for the listener socket of a distributed + ## Erlang node. + ## Note that if there are firewalls between clustered nodes, this + ## port segment for nodes’ communication should be allowed. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html + ## + ## @doc node.dist_listen_max + ## ValueType: Integer + ## Range: [1024,65535] + ## Default: 6369 + dist_listen_max: 6369 + + ## Sets the maximum depth of call stack back-traces in the exit + ## reason element of 'EXIT' tuples. + ## The flag also limits the stacktrace depth returned by + ## process_info item current_stacktrace. + ## + ## @doc node.backtrace_depth + ## ValueType: Integer + ## Range: [0,1024] + ## Default: 23 + backtrace_depth: 23 + +} + +##================================================================== +## Cluster +##================================================================== +cluster { + ## Cluster name. + ## + ## @doc cluster.name + ## ValueType: String + ## Default: emqxcl + name: emqxcl + + ## Enable cluster autoheal from network partition. + ## + ## @doc cluster.autoheal + ## ValueType: Boolean + ## Default: true + autoheal: true + + ## Autoclean down node. A down node will be removed from the cluster + ## if this value > 0. + ## + ## @doc cluster.autoclean + ## ValueType: Duration + ## Default: 5m + autoclean: 5m + + ## Node discovery strategy to join the cluster. + ## + ## @doc cluster.discovery_strategy + ## ValueType: manual | static | mcast | dns | etcd | k8s + ## - manual: Manual join command + ## - static: Static node list + ## - mcast: IP Multicast + ## - dns: DNS A Record + ## - etcd: etcd + ## - k8s: Kubernetes + ## + ## Default: manual + discovery_strategy: manual + + ##---------------------------------------------------------------- + ## Cluster using static node list + ##---------------------------------------------------------------- + static { + ## Node list of the cluster + ## + ## @doc cluster.static.seeds + ## ValueType: Array + ## Default: [] + seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"] + } + + ##---------------------------------------------------------------- + ## Cluster using IP Multicast + ##---------------------------------------------------------------- + mcast { + ## IP Multicast Address. + ## + ## @doc cluster.mcast.addr + ## ValueType: IPAddress + ## Default: "239.192.0.1" + addr: "239.192.0.1" + + ## Multicast Ports. + ## + ## @doc cluster.mcast.ports + ## ValueType: Array + ## Default: [4369, 4370] + ports: [4369, 4370] + + ## Multicast Iface. + ## + ## @doc cluster.mcast.iface + ## ValueType: IPAddress + ## Default: "0.0.0.0" + iface: "0.0.0.0" + + ## Multicast Ttl. + ## + ## @doc cluster.mcast.ttl + ## ValueType: Integer + ## Range: [0,255] + ## Default: 255 + ttl: 255 + + ## Multicast loop. + ## + ## @doc cluster.mcast.loop + ## ValueType: Boolean + ## Default: true + loop: true + } + + ##---------------------------------------------------------------- + ## Cluster using DNS A records + ##---------------------------------------------------------------- + dns { + ## DNS name. + ## + ## @doc cluster.dns.name + ## ValueType: String + ## Default: localhost + name: localhost + + ## The App name is used to build 'node.name' with IP address. + ## + ## @doc cluster.dns.app + ## ValueType: String + ## Default: emqx + app: emqx + } + + ##---------------------------------------------------------------- + ## Cluster using etcd + ##---------------------------------------------------------------- + etcd { + ## Etcd server list, seperated by ','. + ## + ## @doc cluster.etcd.server + ## ValueType: URL + ## Required: true + server: "http://127.0.0.1:2379" + + ## The prefix helps build nodes path in etcd. Each node in the cluster + ## will create a path in etcd: v2/keys/// + ## + ## @doc cluster.etcd.prefix + ## ValueType: String + ## Default: emqxcl + prefix: emqxcl + + ## The TTL for node's path in etcd. + ## + ## @doc cluster.etcd.node_ttl + ## ValueType: Duration + ## Default: 1m + node_ttl: 1m + + ## Path to the file containing the user's private PEM-encoded key. + ## + ## @doc cluster.etcd.ssl.keyfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/key.pem" + ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem" + + ## Path to a file containing the user certificate. + ## + ## @doc cluster.etcd.ssl.certfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/cert.pem" + ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem" + + ## Path to the file containing PEM-encoded CA certificates. The CA certificates + ## are used during server authentication and when building the client certificate chain. + ## + ## @doc cluster.etcd.ssl.cacertfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/cacert.pem" + ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" + } + + ##---------------------------------------------------------------- + ## Cluster using Kubernetes + ##---------------------------------------------------------------- + k8s { + ## Kubernetes API server list, seperated by ','. + ## + ## @doc cluster.k8s.apiserver + ## ValueType: URL + ## Required: true + apiserver: "http://10.110.111.204:8080" + + ## The service name helps lookup EMQ nodes in the cluster. + ## + ## @doc cluster.k8s.service_name + ## ValueType: String + ## Default: emqx + service_name: emqx + + ## The address type is used to extract host from k8s service. + ## + ## @doc cluster.k8s.address_type + ## ValueType: ip | dns | hostname + ## Default: ip + address_type: ip + + ## The app name helps build 'node.name'. + ## + ## @doc cluster.k8s.app_name + ## ValueType: String + ## Default: emqx + app_name: emqx + + ## The suffix added to dns and hostname get from k8s service + ## + ## @doc cluster.k8s.suffix + ## ValueType: String + ## Default: "pod.local" + suffix: "pod.local" + + ## Kubernetes Namespace + ## + ## @doc cluster.k8s.namespace + ## ValueType: String + ## Default: default + namespace: default + } + + db_backend: mnesia + + rlog: { + # role: core + # core_nodes: [] + } + +} + +##================================================================== +## Log +##================================================================== +log { + ## The primary log level + ## + ## - all the log messages with levels lower than this level will + ## be dropped. + ## - all the log messages with levels higher than this level will + ## go into the log handlers. The handlers then decide to log it + ## out or drop it according to the level setting of the handler. + ## + ## Note: Only the messages with severity level higher than or + ## equal to this level will be logged. + ## + ## @doc log.primary_level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + primary_level: warning + + ##---------------------------------------------------------------- + ## The console log handler send log messages to emqx console + ##---------------------------------------------------------------- + ## Log to single line + ## @doc log.console_handler.enable + ## ValueType: Boolean + ## Default: false + console_handler.enable: false + + ## The log level of this handler + ## All the log messages with levels lower than this level will + ## be dropped. + ## + ## @doc log.console_handler.level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + console_handler.level: warning + + ##---------------------------------------------------------------- + ## The file log handlers send log messages to files + ##---------------------------------------------------------------- + ## file_handlers. + file_handlers.emqx_log: { + ## The log level filter of this handler + ## All the log messages with levels lower than this level will + ## be dropped. + ## + ## @doc log.file_handlers..level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + level: warning + + ## The log file for specified level. + ## + ## If `rotation` is disabled, this is the file of the log files. + ## + ## If `rotation` is enabled, this is the base name of the files. + ## Each file in a rotated log is named .N, where N is an integer. + ## + ## Note: Log files for a specific log level will only contain all the logs + ## that higher than or equal to that level + ## + ## @doc log.file_handlers..file + ## ValueType: File + ## Required: true + file: "{{ platform_log_dir }}/emqx.log" + + ## Enables the log rotation. + ## With this enabled, new log files will be created when the current + ## log file is full, max to `rotation_count` files will be created. + ## + ## @doc log.file_handlers..rotation.enable + ## ValueType: Boolean + ## Default: true + rotation.enable: true + + ## Maximum rotation count of log files. + ## + ## @doc log.file_handlers..rotation.count + ## ValueType: Integer + ## Range: [1, 2048] + ## Default: 10 + rotation.count: 10 + + ## Maximum size of each log file. + ## + ## If the max_size reached and `rotation` is disabled, the handler + ## will stop sending log messages, if the `rotation` is enabled, + ## the file rotates. + ## + ## @doc log.file_handlers..max_size + ## ValueType: Size | infinity + ## Default: 10MB + max_size: 10MB + } + + ## file_handlers. + ## + ## You could also create multiple file handlers for different + ## log level for example: + file_handlers.emqx_error_log: { + level: error + file: "{{ platform_log_dir }}/error.log" + } + + ## Timezone offset to display in logs + ## + ## @doc log.time_offset + ## ValueType: system | utc | String + ## - "system" use system zone + ## - "utc" for Universal Coordinated Time (UTC) + ## - "+hh:mm" or "-hh:mm" for a specified offset + ## Default: system + time_offset: system + + ## Limits the total number of characters printed for each log event. + ## + ## @doc log.chars_limit + ## ValueType: Integer | infinity + ## Range: [0, infinity) + ## Default: infinity + chars_limit: infinity + + ## Maximum depth for Erlang term log formatting + ## and Erlang process message queue inspection. + ## + ## @doc log.max_depth + ## ValueType: Integer | infinity + ## Default: 80 + max_depth: 80 + + ## Log formatter + ## @doc log.formatter + ## ValueType: text | json + ## Default: text + formatter: text + + ## Log to single line + ## @doc log.single_line + ## ValueType: Boolean + ## Default: true + single_line: true + + ## The max allowed queue length before switching to sync mode. + ## + ## Log overload protection parameter. If the message queue grows + ## larger than this value the handler switches from anync to sync mode. + ## + ## @doc log.sync_mode_qlen + ## ValueType: Integer + ## Range: [0, ${log.drop_mode_qlen}] + ## Default: 100 + sync_mode_qlen: 100 + + ## The max allowed queue length before switching to drop mode. + ## + ## Log overload protection parameter. When the message queue grows + ## larger than this threshold, the handler switches to a mode in which + ## it drops all new events that senders want to log. + ## + ## @doc log.drop_mode_qlen + ## ValueType: Integer + ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}] + ## Default: 3000 + drop_mode_qlen: 3000 + + ## The max allowed queue length before switching to flush mode. + ## + ## Log overload protection parameter. If the length of the message queue + ## grows larger than this threshold, a flush (delete) operation takes place. + ## To flush events, the handler discards the messages in the message queue + ## by receiving them in a loop without logging. + ## + ## @doc log.flush_qlen + ## ValueType: Integer + ## Range: [${log.drop_mode_qlen}, infinity) + ## Default: 8000 + flush_qlen: 8000 + + ## Kill the log handler when it gets overloaded. + ## + ## Log overload protection parameter. It is possible that a handler, + ## even if it can successfully manage peaks of high load without crashing, + ## can build up a large message queue, or use a large amount of memory. + ## We could kill the log handler in these cases and restart it after a + ## few seconds. + ## + ## @doc log.overload_kill.enable + ## ValueType: Boolean + ## Default: true + overload_kill.enable: true + + ## The max allowed queue length before killing the log hanlder. + ## + ## Log overload protection parameter. This is the maximum allowed queue + ## length. If the message queue grows larger than this, the handler + ## process is terminated. + ## + ## @doc log.overload_kill.qlen + ## ValueType: Integer + ## Range: [0, 1048576] + ## Default: 20000 + overload_kill.qlen: 20000 + + ## The max allowed memory size before killing the log hanlder. + ## + ## Log overload protection parameter. This is the maximum memory size + ## that the handler process is allowed to use. If the handler grows + ## larger than this, the process is terminated. + ## + ## @doc log.overload_kill.mem_size + ## ValueType: Size + ## Default: 30MB + overload_kill.mem_size: 30MB + + ## Restart the log hanlder after some seconds. + ## + ## Log overload protection parameter. If the handler is terminated, + ## it restarts automatically after a delay specified in seconds. + ## + ## @doc log.overload_kill.restart_after + ## ValueType: Duration + ## Default: 5s + overload_kill.restart_after: 5s + + ## Controlling Bursts of Log Requests. + ## + ## Log overload protection parameter. Large bursts of log events - many + ## events received by the handler under a short period of time - can + ## potentially cause problems. By specifying the maximum number of events + ## to be handled within a certain time frame, the handler can avoid + ## choking the log with massive amounts of printouts. + ## + ## Note that there would be no warning if any messages were + ## dropped because of burst control. + ## + ## @doc log.burst_limit.enable + ## ValueType: Boolean + ## Default: false + burst_limit.enable: false + + ## This config controls the maximum number of events to handle within + ## a time frame. After the limit is reached, successive events are + ## dropped until the end of the time frame defined by `window_time`. + ## + ## @doc log.burst_limit.max_count + ## ValueType: Integer + ## Default: 10000 + burst_limit.max_count: 10000 + + ## See the previous description of burst_limit_max_count. + ## + ## @doc log.burst_limit.window_time + ## ValueType: duration + ## Default: 1s + burst_limit.window_time: 1s +} + +##================================================================== +## RPC +##================================================================== +rpc { + ## RPC Mode. + ## + ## @doc rpc.mode + ## ValueType: sync | async + ## Default: async + mode: async + + ## Max batch size of async RPC requests. + ## + ## NOTE: RPC batch won't work when rpc.mode = sync + ## Zero value disables rpc batching. + ## + ## @doc rpc.async_batch_size + ## ValueType: Integer + ## Range: [0, 1048576] + ## Default: 0 + async_batch_size: 256 + + ## RPC port discovery + ## + ## The strategy for discovering the RPC listening port of + ## other nodes. + ## + ## @doc cluster.discovery_strategy + ## ValueType: manual | stateless + ## - manual: discover ports by `tcp_server_port`. + ## - stateless: discover ports in a stateless manner. + ## If node name is `emqx@127.0.0.1`, where the `` is + ## an integer, then the listening port will be `5370 + ` + ## + ## Default: `stateless`. + port_discovery: stateless + + ## TCP server port for RPC. + ## + ## Only takes effect when `rpc.port_discovery` = `manual`. + ## + ## @doc rpc.tcp_server_port + ## ValueType: Integer + ## Range: [1024-65535] + ## Defaults: 5369 + tcp_server_port: 5369 + + ## Number of outgoing RPC connections. + ## + ## Set this to 1 to keep the message order sent from the same + ## client. + ## + ## @doc rpc.tcp_client_num + ## ValueType: Integer + ## Range: [1, 256] + ## Defaults: 1 + tcp_client_num: 1 + + ## RCP Client connect timeout. + ## + ## @doc rpc.connect_timeout + ## ValueType: Duration + ## Default: 5s + connect_timeout: 5s + + ## TCP send timeout of RPC client and server. + ## + ## @doc rpc.send_timeout + ## ValueType: Duration + ## Default: 5s + send_timeout: 5s + + ## Authentication timeout + ## + ## @doc rpc.authentication_timeout + ## ValueType: Duration + ## Default: 5s + authentication_timeout: 5s + + ## Default receive timeout for call() functions + ## + ## @doc rpc.call_receive_timeout + ## ValueType: Duration + ## Default: 15s + call_receive_timeout: 15s + + ## Socket idle keepalive. + ## + ## @doc rpc.socket_keepalive_idle + ## ValueType: Duration + ## Default: 900s + socket_keepalive_idle: 900s + + ## TCP Keepalive probes interval. + ## + ## @doc rpc.socket_keepalive_interval + ## ValueType: Duration + ## Default: 75s + socket_keepalive_interval: 75s + + ## Probes lost to close the connection + ## + ## @doc rpc.socket_keepalive_count + ## ValueType: Integer + ## Default: 9 + socket_keepalive_count: 9 + + ## Size of TCP send buffer. + ## + ## @doc rpc.socket_sndbuf + ## ValueType: Size + ## Default: 1MB + socket_sndbuf: 1MB + + ## Size of TCP receive buffer. + ## + ## @doc rpc.socket_recbuf + ## ValueType: Size + ## Default: 1MB + socket_recbuf: 1MB + + ## Size of user-level software socket buffer. + ## + ## @doc rpc.socket_buffer + ## ValueType: Size + ## Default: 1MB + socket_buffer: 1MB +} diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index f86bb57e8..4152d385c 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -30,6 +30,8 @@ start(_Type, _Args) -> ok = print_otp_version_warning(), _ = load_modules(), + ok = load_config_files(), + {ok, _} = application:ensure_all_started(emqx), _ = emqx_plugins:load(), @@ -75,3 +77,12 @@ load_modules() -> start_modules() -> ok. -endif. + +load_config_files() -> + %% the app env 'config_files' for 'emqx` app should be set + %% in app.time.config by boot script before starting Erlang VM + ConfFiles = application:get_env(emqx, config_files, []), + %% emqx_machine_schema is a superset of emqx_schema + ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), + %% to avoid config being loaded again when emqx app starts. + ok = emqx_app:set_init_config_load_done(). diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl new file mode 100644 index 000000000..1efc3f333 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -0,0 +1,421 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_schema). + +-dialyzer(no_return). +-dialyzer(no_match). +-dialyzer(no_contracts). +-dialyzer(no_unused). +-dialyzer(no_fail_call). + +-include_lib("typerefl/include/types.hrl"). + +-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. +-type file() :: string(). +-type cipher() :: map(). + +-behaviour(hocon_schema). + +-reflect_type([ log_level/0, + file/0, + cipher/0]). + +-export([structs/0, fields/1, translations/0, translation/1]). +-export([t/1, t/3, t/4, ref/1]). +-export([conf_get/2, conf_get/3, keys/2, filter/1]). + +%% Static apps which merge their configs into the merged emqx.conf +%% The list can not be made a dynamic read at run-time as it is used +%% by nodetool to generate app.