diff --git a/apps/.gitkeep b/apps/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 46f81a87d..5200f5239 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,702 +1,3 @@ -## master-88df1713 - -## NOTE: The configurations in this file will be overridden by -## `/data/emqx_overrides.conf` - -##================================================================== -## Node -##================================================================== -node { - ## Node name. - ## See: http://erlang.org/doc/reference_manual/distributed.html - ## - ## @doc node.name - ## ValueType: NodeName - ## Default: emqx@127.0.0.1 - name: "emqx@127.0.0.1" - - ## Cookie for distributed node communication. - ## - ## @doc node.cookie - ## ValueType: String - ## Default: emqxsecretcookie - cookie: emqxsecretcookie - - ## Data dir for the node - ## - ## @doc node.data_dir - ## ValueType: Folder - ## Default: "{{ platform_data_dir }}/" - data_dir: "{{ platform_data_dir }}/" - - ## Dir of crash dump file. - ## - ## @doc node.crash_dump_dir - ## ValueType: Folder - ## Default: "{{ platform_log_dir }}/" - crash_dump_dir: "{{ platform_log_dir }}/" - - ## Global GC Interval. - ## - ## @doc node.global_gc_interval - ## ValueType: Duration - ## Default: 15m - global_gc_interval: 15m - - ## Sets the net_kernel tick time in seconds. - ## Notice that all communicating nodes are to have the same - ## TickTime value specified. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime - ## - ## @doc node.dist_net_ticktime - ## ValueType: Number - ## Default: 2m - dist_net_ticktime: 2m - - ## Sets the port range for the listener socket of a distributed - ## Erlang node. - ## Note that if there are firewalls between clustered nodes, this - ## port segment for nodes’ communication should be allowed. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html - ## - ## @doc node.dist_listen_min - ## ValueType: Integer - ## Range: [1024,65535] - ## Default: 6369 - dist_listen_min: 6369 - - ## Sets the port range for the listener socket of a distributed - ## Erlang node. - ## Note that if there are firewalls between clustered nodes, this - ## port segment for nodes’ communication should be allowed. - ## - ## See: http://www.erlang.org/doc/man/kernel_app.html - ## - ## @doc node.dist_listen_max - ## ValueType: Integer - ## Range: [1024,65535] - ## Default: 6369 - dist_listen_max: 6369 - - ## Sets the maximum depth of call stack back-traces in the exit - ## reason element of 'EXIT' tuples. - ## The flag also limits the stacktrace depth returned by - ## process_info item current_stacktrace. - ## - ## @doc node.backtrace_depth - ## ValueType: Integer - ## Range: [0,1024] - ## Default: 23 - backtrace_depth: 23 - -} - -##================================================================== -## Cluster -##================================================================== -cluster { - ## Cluster name. - ## - ## @doc cluster.name - ## ValueType: String - ## Default: emqxcl - name: emqxcl - - ## Enable cluster autoheal from network partition. - ## - ## @doc cluster.autoheal - ## ValueType: Boolean - ## Default: true - autoheal: true - - ## Autoclean down node. A down node will be removed from the cluster - ## if this value > 0. - ## - ## @doc cluster.autoclean - ## ValueType: Duration - ## Default: 5m - autoclean: 5m - - ## Node discovery strategy to join the cluster. - ## - ## @doc cluster.discovery_strategy - ## ValueType: manual | static | mcast | dns | etcd | k8s - ## - manual: Manual join command - ## - static: Static node list - ## - mcast: IP Multicast - ## - dns: DNS A Record - ## - etcd: etcd - ## - k8s: Kubernetes - ## - ## Default: manual - discovery_strategy: manual - - ##---------------------------------------------------------------- - ## Cluster using static node list - ##---------------------------------------------------------------- - static { - ## Node list of the cluster - ## - ## @doc cluster.static.seeds - ## ValueType: Array - ## Default: [] - seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"] - } - - ##---------------------------------------------------------------- - ## Cluster using IP Multicast - ##---------------------------------------------------------------- - mcast { - ## IP Multicast Address. - ## - ## @doc cluster.mcast.addr - ## ValueType: IPAddress - ## Default: "239.192.0.1" - addr: "239.192.0.1" - - ## Multicast Ports. - ## - ## @doc cluster.mcast.ports - ## ValueType: Array - ## Default: [4369, 4370] - ports: [4369, 4370] - - ## Multicast Iface. - ## - ## @doc cluster.mcast.iface - ## ValueType: IPAddress - ## Default: "0.0.0.0" - iface: "0.0.0.0" - - ## Multicast Ttl. - ## - ## @doc cluster.mcast.ttl - ## ValueType: Integer - ## Range: [0,255] - ## Default: 255 - ttl: 255 - - ## Multicast loop. - ## - ## @doc cluster.mcast.loop - ## ValueType: Boolean - ## Default: true - loop: true - } - - ##---------------------------------------------------------------- - ## Cluster using DNS A records - ##---------------------------------------------------------------- - dns { - ## DNS name. - ## - ## @doc cluster.dns.name - ## ValueType: String - ## Default: localhost - name: localhost - - ## The App name is used to build 'node.name' with IP address. - ## - ## @doc cluster.dns.app - ## ValueType: String - ## Default: emqx - app: emqx - } - - ##---------------------------------------------------------------- - ## Cluster using etcd - ##---------------------------------------------------------------- - etcd { - ## Etcd server list, seperated by ','. - ## - ## @doc cluster.etcd.server - ## ValueType: URL - ## Required: true - server: "http://127.0.0.1:2379" - - ## The prefix helps build nodes path in etcd. Each node in the cluster - ## will create a path in etcd: v2/keys/// - ## - ## @doc cluster.etcd.prefix - ## ValueType: String - ## Default: emqxcl - prefix: emqxcl - - ## The TTL for node's path in etcd. - ## - ## @doc cluster.etcd.node_ttl - ## ValueType: Duration - ## Default: 1m - node_ttl: 1m - - ## Path to the file containing the user's private PEM-encoded key. - ## - ## @doc cluster.etcd.ssl.keyfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/key.pem" - ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem" - - ## Path to a file containing the user certificate. - ## - ## @doc cluster.etcd.ssl.certfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/cert.pem" - ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem" - - ## Path to the file containing PEM-encoded CA certificates. The CA certificates - ## are used during server authentication and when building the client certificate chain. - ## - ## @doc cluster.etcd.ssl.cacertfile - ## ValueType: File - ## Default: "{{ platform_etc_dir }}/certs/cacert.pem" - ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" - } - - ##---------------------------------------------------------------- - ## Cluster using Kubernetes - ##---------------------------------------------------------------- - k8s { - ## Kubernetes API server list, seperated by ','. - ## - ## @doc cluster.k8s.apiserver - ## ValueType: URL - ## Required: true - apiserver: "http://10.110.111.204:8080" - - ## The service name helps lookup EMQ nodes in the cluster. - ## - ## @doc cluster.k8s.service_name - ## ValueType: String - ## Default: emqx - service_name: emqx - - ## The address type is used to extract host from k8s service. - ## - ## @doc cluster.k8s.address_type - ## ValueType: ip | dns | hostname - ## Default: ip - address_type: ip - - ## The app name helps build 'node.name'. - ## - ## @doc cluster.k8s.app_name - ## ValueType: String - ## Default: emqx - app_name: emqx - - ## The suffix added to dns and hostname get from k8s service - ## - ## @doc cluster.k8s.suffix - ## ValueType: String - ## Default: "pod.local" - suffix: "pod.local" - - ## Kubernetes Namespace - ## - ## @doc cluster.k8s.namespace - ## ValueType: String - ## Default: default - namespace: default - } - - db_backend: mnesia - - rlog: { - # role: core - # core_nodes: [] - } - -} - -##================================================================== -## Log -##================================================================== -log { - ## The primary log level - ## - ## - all the log messages with levels lower than this level will - ## be dropped. - ## - all the log messages with levels higher than this level will - ## go into the log handlers. The handlers then decide to log it - ## out or drop it according to the level setting of the handler. - ## - ## Note: Only the messages with severity level higher than or - ## equal to this level will be logged. - ## - ## @doc log.primary_level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - primary_level: warning - - ##---------------------------------------------------------------- - ## The console log handler send log messages to emqx console - ##---------------------------------------------------------------- - ## Log to single line - ## @doc log.console_handler.enable - ## ValueType: Boolean - ## Default: false - console_handler.enable: false - - ## The log level of this handler - ## All the log messages with levels lower than this level will - ## be dropped. - ## - ## @doc log.console_handler.level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - console_handler.level: warning - - ##---------------------------------------------------------------- - ## The file log handlers send log messages to files - ##---------------------------------------------------------------- - ## file_handlers. - file_handlers.emqx_log: { - ## The log level filter of this handler - ## All the log messages with levels lower than this level will - ## be dropped. - ## - ## @doc log.file_handlers..level - ## ValueType: debug | info | notice | warning | error | critical | alert | emergency - ## Default: warning - level: warning - - ## The log file for specified level. - ## - ## If `rotation` is disabled, this is the file of the log files. - ## - ## If `rotation` is enabled, this is the base name of the files. - ## Each file in a rotated log is named .N, where N is an integer. - ## - ## Note: Log files for a specific log level will only contain all the logs - ## that higher than or equal to that level - ## - ## @doc log.file_handlers..file - ## ValueType: File - ## Required: true - file: "{{ platform_log_dir }}/emqx.log" - - ## Enables the log rotation. - ## With this enabled, new log files will be created when the current - ## log file is full, max to `rotation_count` files will be created. - ## - ## @doc log.file_handlers..rotation.enable - ## ValueType: Boolean - ## Default: true - rotation.enable: true - - ## Maximum rotation count of log files. - ## - ## @doc log.file_handlers..rotation.count - ## ValueType: Integer - ## Range: [1, 2048] - ## Default: 10 - rotation.count: 10 - - ## Maximum size of each log file. - ## - ## If the max_size reached and `rotation` is disabled, the handler - ## will stop sending log messages, if the `rotation` is enabled, - ## the file rotates. - ## - ## @doc log.file_handlers..max_size - ## ValueType: Size | infinity - ## Default: 10MB - max_size: 10MB - } - - ## file_handlers. - ## - ## You could also create multiple file handlers for different - ## log level for example: - file_handlers.emqx_error_log: { - level: error - file: "{{ platform_log_dir }}/error.log" - } - - ## Timezone offset to display in logs - ## - ## @doc log.time_offset - ## ValueType: system | utc | String - ## - "system" use system zone - ## - "utc" for Universal Coordinated Time (UTC) - ## - "+hh:mm" or "-hh:mm" for a specified offset - ## Default: system - time_offset: system - - ## Limits the total number of characters printed for each log event. - ## - ## @doc log.chars_limit - ## ValueType: Integer | infinity - ## Range: [0, infinity) - ## Default: infinity - chars_limit: infinity - - ## Maximum depth for Erlang term log formatting - ## and Erlang process message queue inspection. - ## - ## @doc log.max_depth - ## ValueType: Integer | infinity - ## Default: 80 - max_depth: 80 - - ## Log formatter - ## @doc log.formatter - ## ValueType: text | json - ## Default: text - formatter: text - - ## Log to single line - ## @doc log.single_line - ## ValueType: Boolean - ## Default: true - single_line: true - - ## The max allowed queue length before switching to sync mode. - ## - ## Log overload protection parameter. If the message queue grows - ## larger than this value the handler switches from anync to sync mode. - ## - ## @doc log.sync_mode_qlen - ## ValueType: Integer - ## Range: [0, ${log.drop_mode_qlen}] - ## Default: 100 - sync_mode_qlen: 100 - - ## The max allowed queue length before switching to drop mode. - ## - ## Log overload protection parameter. When the message queue grows - ## larger than this threshold, the handler switches to a mode in which - ## it drops all new events that senders want to log. - ## - ## @doc log.drop_mode_qlen - ## ValueType: Integer - ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}] - ## Default: 3000 - drop_mode_qlen: 3000 - - ## The max allowed queue length before switching to flush mode. - ## - ## Log overload protection parameter. If the length of the message queue - ## grows larger than this threshold, a flush (delete) operation takes place. - ## To flush events, the handler discards the messages in the message queue - ## by receiving them in a loop without logging. - ## - ## @doc log.flush_qlen - ## ValueType: Integer - ## Range: [${log.drop_mode_qlen}, infinity) - ## Default: 8000 - flush_qlen: 8000 - - ## Kill the log handler when it gets overloaded. - ## - ## Log overload protection parameter. It is possible that a handler, - ## even if it can successfully manage peaks of high load without crashing, - ## can build up a large message queue, or use a large amount of memory. - ## We could kill the log handler in these cases and restart it after a - ## few seconds. - ## - ## @doc log.overload_kill.enable - ## ValueType: Boolean - ## Default: true - overload_kill.enable: true - - ## The max allowed queue length before killing the log hanlder. - ## - ## Log overload protection parameter. This is the maximum allowed queue - ## length. If the message queue grows larger than this, the handler - ## process is terminated. - ## - ## @doc log.overload_kill.qlen - ## ValueType: Integer - ## Range: [0, 1048576] - ## Default: 20000 - overload_kill.qlen: 20000 - - ## The max allowed memory size before killing the log hanlder. - ## - ## Log overload protection parameter. This is the maximum memory size - ## that the handler process is allowed to use. If the handler grows - ## larger than this, the process is terminated. - ## - ## @doc log.overload_kill.mem_size - ## ValueType: Size - ## Default: 30MB - overload_kill.mem_size: 30MB - - ## Restart the log hanlder after some seconds. - ## - ## Log overload protection parameter. If the handler is terminated, - ## it restarts automatically after a delay specified in seconds. - ## - ## @doc log.overload_kill.restart_after - ## ValueType: Duration - ## Default: 5s - overload_kill.restart_after: 5s - - ## Controlling Bursts of Log Requests. - ## - ## Log overload protection parameter. Large bursts of log events - many - ## events received by the handler under a short period of time - can - ## potentially cause problems. By specifying the maximum number of events - ## to be handled within a certain time frame, the handler can avoid - ## choking the log with massive amounts of printouts. - ## - ## Note that there would be no warning if any messages were - ## dropped because of burst control. - ## - ## @doc log.burst_limit.enable - ## ValueType: Boolean - ## Default: false - burst_limit.enable: false - - ## This config controls the maximum number of events to handle within - ## a time frame. After the limit is reached, successive events are - ## dropped until the end of the time frame defined by `window_time`. - ## - ## @doc log.burst_limit.max_count - ## ValueType: Integer - ## Default: 10000 - burst_limit.max_count: 10000 - - ## See the previous description of burst_limit_max_count. - ## - ## @doc log.burst_limit.window_time - ## ValueType: duration - ## Default: 1s - burst_limit.window_time: 1s -} - -##================================================================== -## RPC -##================================================================== -rpc { - ## RPC Mode. - ## - ## @doc rpc.mode - ## ValueType: sync | async - ## Default: async - mode: async - - ## Max batch size of async RPC requests. - ## - ## NOTE: RPC batch won't work when rpc.mode = sync - ## Zero value disables rpc batching. - ## - ## @doc rpc.async_batch_size - ## ValueType: Integer - ## Range: [0, 1048576] - ## Default: 0 - async_batch_size: 256 - - ## RPC port discovery - ## - ## The strategy for discovering the RPC listening port of - ## other nodes. - ## - ## @doc cluster.discovery_strategy - ## ValueType: manual | stateless - ## - manual: discover ports by `tcp_server_port`. - ## - stateless: discover ports in a stateless manner. - ## If node name is `emqx@127.0.0.1`, where the `` is - ## an integer, then the listening port will be `5370 + ` - ## - ## Default: `stateless`. - port_discovery: stateless - - ## TCP server port for RPC. - ## - ## Only takes effect when `rpc.port_discovery` = `manual`. - ## - ## @doc rpc.tcp_server_port - ## ValueType: Integer - ## Range: [1024-65535] - ## Defaults: 5369 - tcp_server_port: 5369 - - ## Number of outgoing RPC connections. - ## - ## Set this to 1 to keep the message order sent from the same - ## client. - ## - ## @doc rpc.tcp_client_num - ## ValueType: Integer - ## Range: [1, 256] - ## Defaults: 1 - tcp_client_num: 1 - - ## RCP Client connect timeout. - ## - ## @doc rpc.connect_timeout - ## ValueType: Duration - ## Default: 5s - connect_timeout: 5s - - ## TCP send timeout of RPC client and server. - ## - ## @doc rpc.send_timeout - ## ValueType: Duration - ## Default: 5s - send_timeout: 5s - - ## Authentication timeout - ## - ## @doc rpc.authentication_timeout - ## ValueType: Duration - ## Default: 5s - authentication_timeout: 5s - - ## Default receive timeout for call() functions - ## - ## @doc rpc.call_receive_timeout - ## ValueType: Duration - ## Default: 15s - call_receive_timeout: 15s - - ## Socket idle keepalive. - ## - ## @doc rpc.socket_keepalive_idle - ## ValueType: Duration - ## Default: 900s - socket_keepalive_idle: 900s - - ## TCP Keepalive probes interval. - ## - ## @doc rpc.socket_keepalive_interval - ## ValueType: Duration - ## Default: 75s - socket_keepalive_interval: 75s - - ## Probes lost to close the connection - ## - ## @doc rpc.socket_keepalive_count - ## ValueType: Integer - ## Default: 9 - socket_keepalive_count: 9 - - ## Size of TCP send buffer. - ## - ## @doc rpc.socket_sndbuf - ## ValueType: Size - ## Default: 1MB - socket_sndbuf: 1MB - - ## Size of TCP receive buffer. - ## - ## @doc rpc.socket_recbuf - ## ValueType: Size - ## Default: 1MB - socket_recbuf: 1MB - - ## Size of user-level software socket buffer. - ## - ## @doc rpc.socket_buffer - ## ValueType: Size - ## Default: 1MB - socket_buffer: 1MB -} - ##================================================================== ## Broker ##================================================================== diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 546b70f14..57f7cd57f 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -1,6 +1,6 @@ {application, emqx, [{id, "emqx"}, - {description, "EMQ X"}, + {description, "EMQ X Core"}, {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index b1c976cc1..3a5935a8b 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -23,7 +23,6 @@ %% Start/Stop the application -export([ start/0 - , restart/1 , is_running/1 , stop/0 ]). @@ -52,12 +51,6 @@ , run_fold_hook/3 ]). -%% Shutdown and reboot --export([ shutdown/0 - , shutdown/1 - , reboot/0 - ]). - %% Troubleshooting -export([ set_debug_secret/1 ]). @@ -94,19 +87,8 @@ set_debug_secret(PathToSecretFile) -> %% @doc Start emqx application -spec(start() -> {ok, list(atom())} | {error, term()}). start() -> - %% Check OS - %% Check VM - %% Check Mnesia application:ensure_all_started(?APP). --spec(restart(string()) -> ok). -restart(ConfFile) -> - reload_config(ConfFile), - shutdown(), - ok = application:stop(mnesia), - _ = application:start(mnesia), - reboot(). - %% @doc Stop emqx application. -spec(stop() -> ok | {error, term()}). stop() -> @@ -202,40 +184,3 @@ run_hook(HookPoint, Args) -> -spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()). run_fold_hook(HookPoint, Args, Acc) -> emqx_hooks:run_fold(HookPoint, Args, Acc). - -%%-------------------------------------------------------------------- -%% Shutdown and reboot -%%-------------------------------------------------------------------- - -shutdown() -> - shutdown(normal). - -shutdown(Reason) -> - ?LOG(critical, "emqx shutdown for ~s", [Reason]), - _ = emqx_alarm_handler:unload(), - lists:foreach(fun application:stop/1 - , lists:reverse(default_started_applications()) - ). - -reboot() -> - lists:foreach(fun application:start/1 , default_started_applications()). - -default_started_applications() -> - [gproc, esockd, ranch, cowboy, ekka, quicer, emqx] ++ emqx_feature(). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -reload_config(ConfFile) -> - {ok, [Conf]} = file:consult(ConfFile), - lists:foreach(fun({App, Vals}) -> - [application:set_env(App, Par, Val) || {Par, Val} <- Vals] - end, Conf). - --ifndef(EMQX_DEP_APPS). -emqx_feature() -> []. --else. -emqx_feature() -> - ?EMQX_DEP_APPS. --endif. diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index a35c90747..800ed3a18 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -24,6 +24,7 @@ , get_description/0 , get_release/0 , set_init_config_load_done/0 + , set_override_conf_file/1 ]). -include("emqx.hrl"). @@ -46,24 +47,14 @@ start(_Type, _Args) -> ok = maybe_load_config(), - ok = set_backtrace_depth(), - print_otp_version_warning(), - print_banner(), %% Load application first for ekka_mnesia scanner - _ = load_ce_modules(), ekka:start(), ok = ekka_rlog:wait_for_shards(?EMQX_SHARDS, infinity), - false == os:getenv("EMQX_NO_QUIC") - andalso application:ensure_all_started(quicer), + ok = maybe_start_quicer(), {ok, Sup} = emqx_sup:start_link(), - ok = start_autocluster(), - % ok = emqx_plugins:init(), - _ = emqx_plugins:load(), - _ = start_ce_modules(), - emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), - register(emqx, self()), + ok = maybe_start_listeners(), ok = emqx_alarm_handler:load(), - print_vsn(), + register(emqx, self()), {ok, Sup}. prep_stop(_State) -> @@ -79,6 +70,13 @@ stop(_State) -> ok. set_init_config_load_done() -> application:set_env(emqx, init_config_load_done, true). +%% @doc This API is mostly for testing. +%% The override config file is typically located in the 'data' dir when +%% it is a emqx release, but emqx app should not have to konw where the +%% 'data' dir is located. +set_override_conf_file(File) -> + application:set_env(emqx, override_conf_file, File). + maybe_load_config() -> case application:get_env(emqx, init_config_load_done, false) of true -> @@ -89,52 +87,22 @@ maybe_load_config() -> emqx_config:init_load(emqx_schema, ConfFiles) end. -set_backtrace_depth() -> - Depth = emqx_config:get([node, backtrace_depth]), - _ = erlang:system_flag(backtrace_depth, Depth), - ok. +maybe_start_listeners() -> + case emqx_boot:is_enabled(listeners) of + true -> + ok = emqx_listeners:start(); + false -> + ok + end. --ifndef(EMQX_ENTERPRISE). -load_ce_modules() -> - application:load(emqx_modules). -start_ce_modules() -> - application:ensure_all_started(emqx_modules). --else. -load_ce_modules() -> - ok. -start_ce_modules() -> - ok. --endif. - -%%-------------------------------------------------------------------- -%% Print Banner -%%-------------------------------------------------------------------- - --if(?OTP_RELEASE> 22). -print_otp_version_warning() -> ok. --else. -print_otp_version_warning() -> - ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", - [?OTP_RELEASE]). --endif. % OTP_RELEASE - --ifndef(TEST). - -print_banner() -> - ?ULOG("Starting ~s on node ~s~n", [?APP, node()]). - -print_vsn() -> - ?ULOG("~s ~s is running now!~n", [get_description(), get_release()]). - --else. % TEST - -print_vsn() -> - ok. - -print_banner() -> - ok. - --endif. % TEST +maybe_start_quicer() -> + case os:getenv("EMQX_NO_QUIC") of + X when X =:= "1" orelse X =:= "true" -> + ok; + _ -> + {ok, _} = application:ensure_all_started(quicer), + ok + end. get_description() -> {ok, Descr0} = application:get_key(?APP, description), @@ -163,12 +131,3 @@ get_release() -> release_in_macro() -> element(2, ?EMQX_RELEASE). - -%%-------------------------------------------------------------------- -%% Autocluster -%%-------------------------------------------------------------------- -start_autocluster() -> - ekka:callback(prepare, fun emqx:shutdown/1), - ekka:callback(reboot, fun emqx:reboot/0), - _ = ekka:autocluster(?APP), %% returns 'ok' or a pid or 'any()' as in spec - ok. diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 91341204b..e2d2c8207 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -260,7 +260,7 @@ load_hocon_file(FileName, LoadType) -> end. emqx_override_conf_name() -> - filename:join([?MODULE:get([node, data_dir]), "emqx_override.conf"]). + application:get_env(emqx, override_conf_file, "emqx_override.conf"). bin(Bin) when is_binary(Bin) -> Bin; bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 5ca283481..defe96182 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -29,7 +29,6 @@ init([]) -> {ok, {{one_for_one, 10, 100}, %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker) - , child_spec(emqx_global_gc, worker) , child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_hooks, worker) , child_spec(emqx_stats, worker) diff --git a/apps/emqx/src/emqx_logger_textfmt.erl b/apps/emqx/src/emqx_logger_textfmt.erl index 97862a72a..cf419f381 100644 --- a/apps/emqx/src/emqx_logger_textfmt.erl +++ b/apps/emqx/src/emqx_logger_textfmt.erl @@ -36,7 +36,4 @@ enrich_fmt(Fmt, Args, #{mfa := Mfa, line := Line}) -> enrich_fmt(Fmt, Args, _) -> {Fmt, Args}. -mfa({M, F, A}) -> - <<(atom_to_binary(M, utf8))/binary, $:, - (atom_to_binary(F, utf8))/binary, $/, - (integer_to_binary(A))/binary>>. +mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3fd060d9f..4e0e8f85e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -24,7 +24,6 @@ -include_lib("typerefl/include/types.hrl"). --type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. -type duration() :: integer(). -type duration_s() :: integer(). -type duration_ms() :: integer(). @@ -60,179 +59,18 @@ -behaviour(hocon_schema). --reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0, +-reflect_type([ duration/0, duration_s/0, duration_ms/0, bytesize/0, wordsize/0, percent/0, file/0, comma_separated_list/0, bar_separated_list/0, ip_port/0, cipher/0, comma_separated_atoms/0]). --export([structs/0, fields/1, translations/0, translation/1]). +-export([structs/0, fields/1]). -export([t/1, t/3, t/4, ref/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ssl/1]). -%% will be used by emqx_ct_helper to find the dependent apps --export([includes/0, extra_schema_fields/1]). - -structs() -> ["cluster", "node", "rpc", "log", - "zones", "listeners", "broker", - "plugins", "sysmon", "alarm"] - ++ ?MODULE:includes(). - --ifndef(EMQX_EXT_SCHEMAS). -includes() -> []. --else. -includes() -> - [FieldName || {FieldName, _SchemaMod} <- ?EMQX_EXT_SCHEMAS]. --endif. - -fields("cluster") -> - [ {"name", t(atom(), "ekka.cluster_name", emqxcl)} - , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]), - undefined, manual)} - , {"autoclean", t(duration(), "ekka.cluster_autoclean", "5m")} - , {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)} - , {"static", ref("static")} - , {"mcast", ref("mcast")} - , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} - , {"dns", ref("dns")} - , {"etcd", ref("etcd")} - , {"k8s", ref("k8s")} - , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)} - , {"rlog", ref("rlog")} - ]; - -fields("static") -> - [ {"seeds", t(hoconsc:array(string()), undefined, [])}]; - -fields("mcast") -> - [ {"addr", t(string(), undefined, "239.192.0.1")} - , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])} - , {"iface", t(string(), undefined, "0.0.0.0")} - , {"ttl", t(range(0, 255), undefined, 255)} - , {"loop", t(boolean(), undefined, true)} - , {"sndbuf", t(bytesize(), undefined, "16KB")} - , {"recbuf", t(bytesize(), undefined, "16KB")} - , {"buffer", t(bytesize(), undefined, "32KB")} - ]; - -fields("dns") -> - [ {"name", t(string(), undefined, "localhost")} - , {"app", t(string(), undefined, "emqx")}]; - -fields("etcd") -> - [ {"server", t(comma_separated_list())} - , {"prefix", t(string(), undefined, "emqxcl")} - , {"node_ttl", t(duration(), undefined, "1m")} - , {"ssl", ref("etcd_ssl")} - ]; - -fields("etcd_ssl") -> - ssl(#{}); - -fields("k8s") -> - [ {"apiserver", t(string())} - , {"service_name", t(string(), undefined, "emqx")} - , {"address_type", t(union([ip, dns, hostname]))} - , {"app_name", t(string(), undefined, "emqx")} - , {"namespace", t(string(), undefined, "default")} - , {"suffix", t(string(), undefined, "pod.local")} - ]; - -fields("rlog") -> - [ {"role", t(union([core, replicant]), "ekka.node_role", core)} - , {"core_nodes", t(comma_separated_atoms(), "ekka.core_nodes", [])} - ]; - -fields("node") -> - [ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1", - override_env => "EMQX_NODE_NAME" - })} - , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie", - default => "emqxsecretcookie", - sensitive => true, - override_env => "EMQX_NODE_COOKIE" - })} - , {"data_dir", t(string())} - , {"config_files", t(comma_separated_list())} - , {"global_gc_interval", t(duration(), undefined, "15m")} - , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} - , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")} - , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} - , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} - , {"backtrace_depth", t(integer(), undefined, 23)} - ]; - -fields("rpc") -> - [ {"mode", t(union(sync, async), undefined, async)} - , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)} - , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)} - , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)} - , {"tcp_client_num", t(range(1, 256), undefined, 1)} - , {"connect_timeout", t(duration(), "gen_rpc.connect_timeout", "5s")} - , {"send_timeout", t(duration(), "gen_rpc.send_timeout", "5s")} - , {"authentication_timeout", t(duration(), "gen_rpc.authentication_timeout", "5s")} - , {"call_receive_timeout", t(duration(), "gen_rpc.call_receive_timeout", "15s")} - , {"socket_keepalive_idle", t(duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")} - , {"socket_keepalive_interval", t(duration_s(), "gen_rpc.socket_keepalive_interval", "75s")} - , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)} - , {"socket_sndbuf", t(bytesize(), "gen_rpc.socket_sndbuf", "1MB")} - , {"socket_recbuf", t(bytesize(), "gen_rpc.socket_recbuf", "1MB")} - , {"socket_buffer", t(bytesize(), "gen_rpc.socket_buffer", "1MB")} - ]; - -fields("log") -> - [ {"primary_level", t(log_level(), undefined, warning)} - , {"console_handler", ref("console_handler")} - , {"file_handlers", ref("file_handlers")} - , {"time_offset", t(string(), undefined, "system")} - , {"chars_limit", maybe_infinity(range(1, inf))} - , {"supervisor_reports", t(union([error, progress]), undefined, error)} - , {"max_depth", t(union([infinity, integer()]), - "kernel.error_logger_format_depth", 80)} - , {"formatter", t(union([text, json]), undefined, text)} - , {"single_line", t(boolean(), undefined, true)} - , {"sync_mode_qlen", t(integer(), undefined, 100)} - , {"drop_mode_qlen", t(integer(), undefined, 3000)} - , {"flush_qlen", t(integer(), undefined, 8000)} - , {"overload_kill", ref("log_overload_kill")} - , {"burst_limit", ref("log_burst_limit")} - , {"error_logger", t(atom(), "kernel.error_logger", silent)} - ]; - -fields("console_handler") -> - [ {"enable", t(boolean(), undefined, false)} - , {"level", t(log_level(), undefined, warning)} - ]; - -fields("file_handlers") -> - [ {"$name", ref("log_file_handler")} - ]; - -fields("log_file_handler") -> - [ {"level", t(log_level(), undefined, warning)} - , {"file", t(file(), undefined, undefined)} - , {"rotation", ref("log_rotation")} - , {"max_size", maybe_infinity(bytesize(), "10MB")} - ]; - -fields("log_rotation") -> - [ {"enable", t(boolean(), undefined, true)} - , {"count", t(range(1, 2048), undefined, 10)} - ]; - -fields("log_overload_kill") -> - [ {"enable", t(boolean(), undefined, true)} - , {"mem_size", t(bytesize(), undefined, "30MB")} - , {"qlen", t(integer(), undefined, 20000)} - , {"restart_after", t(union(duration(), infinity), undefined, "5s")} - ]; - -fields("log_burst_limit") -> - [ {"enable", t(boolean(), undefined, true)} - , {"max_count", t(integer(), undefined, 10000)} - , {"window_time", t(duration(), undefined, "1s")} - ]; +structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm"]. fields("stats") -> [ {"enable", t(boolean(), undefined, true)} @@ -480,20 +318,7 @@ fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} , {"validity_period", t(duration(), undefined, "24h")} - ]; - -fields(FieldName) -> - ?MODULE:extra_schema_fields(FieldName). - --ifndef(EMQX_EXT_SCHEMAS). -%% Function extra_schema_fields/1 only terminates with explicit exception --dialyzer([{nowarn_function, [extra_schema_fields/1]}]). -extra_schema_fields(FieldName) -> error({unknown_field, FieldName}). --else. -extra_schema_fields(FieldName) -> - {_, Mod} = lists:keyfind(FieldName, 1, ?EMQX_EXT_SCHEMAS), - Mod:fields(FieldName). --endif. + ]. mqtt_listener() -> base_listener() ++ @@ -509,117 +334,6 @@ base_listener() -> , {"rate_limit", ref("rate_limit")} ]. -translations() -> ["ekka", "kernel", "emqx"]. - -translation("ekka") -> - [ {"cluster_discovery", fun tr_cluster__discovery/1}]; - -translation("kernel") -> - [ {"logger_level", fun tr_logger_level/1} - , {"logger", fun tr_logger/1}]; - -translation("emqx") -> - [ {"config_files", fun tr_config_files/1} - ]. - -tr_config_files(Conf) -> - case conf_get("emqx.config_files", Conf) of - [_ | _] = Files -> - Files; - _ -> - case os:getenv("RUNNER_ETC_DIR") of - false -> - [filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])]; - Dir -> - [filename:join([Dir, "emqx.conf"])] - end - end. - -tr_cluster__discovery(Conf) -> - Strategy = conf_get("cluster.discovery_strategy", Conf), - {Strategy, filter(options(Strategy, Conf))}. - -tr_logger_level(Conf) -> conf_get("log.primary_level", Conf). - -tr_logger(Conf) -> - CharsLimit = case conf_get("log.chars_limit", Conf) of - infinity -> unlimited; - V -> V - end, - SingleLine = conf_get("log.single_line", Conf), - FmtName = conf_get("log.formatter", Conf), - Formatter = formatter(FmtName, CharsLimit, SingleLine), - BasicConf = #{ - sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf), - drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf), - flush_qlen => conf_get("log.flush_qlen", Conf), - overload_kill_enable => conf_get("log.overload_kill.enable", Conf), - overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf), - overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf), - overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf), - burst_limit_enable => conf_get("log.burst_limit.enable", Conf), - burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf), - burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf) - }, - Filters = case conf_get("log.supervisor_reports", Conf) of - error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]; - progress -> [] - end, - %% For the default logger that outputs to console - ConsoleHandler = - case conf_get("log.console_handler.enable", Conf) of - true -> - [{handler, console, logger_std_h, #{ - level => conf_get("log.console_handler.level", Conf), - config => BasicConf#{type => standard_io}, - formatter => Formatter, - filters => Filters - }}]; - false -> [] - end, - %% For the file logger - FileHandlers = - [{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{ - level => conf_get("level", SubConf), - config => BasicConf#{ - type => case conf_get("rotation.enable", SubConf) of - true -> wrap; - _ -> halt - end, - file => conf_get("file", SubConf), - max_no_files => conf_get("rotation.count", SubConf), - max_no_bytes => conf_get("max_size", SubConf) - }, - formatter => Formatter, - filters => Filters, - filesync_repeat_interval => no_repeat - }} - || {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf, #{}))], - - [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers. - -%% helpers -formatter(json, CharsLimit, SingleLine) -> - {emqx_logger_jsonfmt, - #{chars_limit => CharsLimit, - single_line => SingleLine - }}; -formatter(text, CharsLimit, SingleLine) -> - {emqx_logger_textfmt, - #{template => - [time," [",level,"] ", - {clientid, - [{peername, - [clientid,"@",peername," "], - [clientid, " "]}], - [{peername, - [peername," "], - []}]}, - msg,"\n"], - chars_limit => CharsLimit, - single_line => SingleLine - }}. - %% utils -spec(conf_get(string() | [string()], hocon:config()) -> term()). conf_get(Key, Conf) -> @@ -740,8 +454,7 @@ t(Type, Mapping, Default, OverrideEnv, Validator) -> , validator => Validator }). -ref(Field) -> - fun (type) -> Field; (_) -> undefined end. +ref(Field) -> hoconsc:t(hoconsc:ref(?MODULE, Field)). maybe_disabled(T) -> maybe_sth(disabled, T, disabled). @@ -817,37 +530,6 @@ to_erl_cipher_suite(Str) -> Cipher -> Cipher end. -options(static, Conf) -> - [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}]; -options(mcast, Conf) -> - {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)), - {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)), - Ports = conf_get("cluster.mcast.ports", Conf), - [{addr, Addr}, {ports, Ports}, {iface, Iface}, - {ttl, conf_get("cluster.mcast.ttl", Conf, 1)}, - {loop, conf_get("cluster.mcast.loop", Conf, true)}]; -options(dns, Conf) -> - [{name, conf_get("cluster.dns.name", Conf)}, - {app, conf_get("cluster.dns.app", Conf)}]; -options(etcd, Conf) -> - Namespace = "cluster.etcd.ssl", - SslOpts = fun(C) -> - Options = keys(Namespace, C), - lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end, - [{server, conf_get("cluster.etcd.server", Conf)}, - {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")}, - {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)}, - {ssl_options, filter(SslOpts(Conf))}]; -options(k8s, Conf) -> - [{apiserver, conf_get("cluster.k8s.apiserver", Conf)}, - {service_name, conf_get("cluster.k8s.service_name", Conf)}, - {address_type, conf_get("cluster.k8s.address_type", Conf, ip)}, - {app_name, conf_get("cluster.k8s.app_name", Conf)}, - {namespace, conf_get("cluster.k8s.namespace", Conf)}, - {suffix, conf_get("cluster.k8s.suffix", Conf, "")}]; -options(manual, _Conf) -> - []. - to_atom(Atom) when is_atom(Atom) -> Atom; to_atom(Str) when is_list(Str) -> diff --git a/apps/emqx/src/emqx_sup.erl b/apps/emqx/src/emqx_sup.erl index 3bddac1ed..9338b7390 100644 --- a/apps/emqx/src/emqx_sup.erl +++ b/apps/emqx/src/emqx_sup.erl @@ -67,16 +67,16 @@ init([]) -> BrokerSup = child_spec(emqx_broker_sup, supervisor), CMSup = child_spec(emqx_cm_sup, supervisor), SysSup = child_spec(emqx_sys_sup, supervisor), - Childs = [KernelSup] ++ - [RouterSup || emqx_boot:is_enabled(router)] ++ - [BrokerSup || emqx_boot:is_enabled(broker)] ++ - [CMSup || emqx_boot:is_enabled(broker)] ++ - [SysSup], + Children = [KernelSup] ++ + [RouterSup || emqx_boot:is_enabled(router)] ++ + [BrokerSup || emqx_boot:is_enabled(broker)] ++ + [CMSup || emqx_boot:is_enabled(broker)] ++ + [SysSup], SupFlags = #{strategy => one_for_all, intensity => 0, period => 1 }, - {ok, {SupFlags, Childs}}. + {ok, {SupFlags, Children}}. %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx/test/emqx_SUITE.erl b/apps/emqx/test/emqx_SUITE.erl index 9614822ba..158dce848 100644 --- a/apps/emqx/test/emqx_SUITE.erl +++ b/apps/emqx/test/emqx_SUITE.erl @@ -32,25 +32,6 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). -t_restart(_) -> - ConfFile = "test.config", - Data = "[{emqx_statsd,[{interval,15000},{push_gateway,\"http://127.0.0.1:9091\"}]}].", - file:write_file(ConfFile, list_to_binary(Data)), - emqx:restart(ConfFile), - file:delete(ConfFile). - -t_stop_start(_) -> - emqx:stop(), - false = emqx:is_running(node()), - emqx:start(), - true = emqx:is_running(node()), - ok = emqx:shutdown(), - false = emqx:is_running(node()), - ok = emqx:reboot(), - true = emqx:is_running(node()), - ok = emqx:shutdown(for_test), - false = emqx:is_running(node()). - t_emqx_pubsub_api(_) -> true = emqx:is_running(node()), {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), diff --git a/apps/emqx_authz/test/emqx_authz_api_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_SUITE.erl index 27c571d1c..0bb1b2132 100644 --- a/apps/emqx_authz/test/emqx_authz_api_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_SUITE.erl @@ -57,8 +57,9 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - %delete_default_app(), - emqx_ct_helpers:stop_apps([emqx_authz]). + ok = emqx_authz:update(replace, []), + emqx_ct_helpers:stop_apps([emqx_authz]), + ok. % set_special_configs(emqx) -> % application:set_env(emqx, allow_anonymous, true), diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index f387be77a..3d6d918eb 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -49,8 +49,10 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + emqx_authz:update(replace, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), - meck:unload(emqx_resource). + meck:unload(emqx_resource), + ok. %%------------------------------------------------------------------------------ %% Testcases diff --git a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl index a5f59ac64..9ba6aa843 100644 --- a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl @@ -50,8 +50,10 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + emqx_authz:update(replace, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), - meck:unload(emqx_resource). + meck:unload(emqx_resource), + ok. -define(RULE1,[#{<<"topics">> => [<<"#">>], <<"permission">> => <<"deny">>, diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 050234478..54aa7d8fc 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -52,6 +52,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + ok = emqx_authz:update(replace, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), meck:unload(emqx_resource). diff --git a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl index 439dce14f..66a6581a8 100644 --- a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl @@ -51,6 +51,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + ok = emqx_authz:update(replace, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), meck:unload(emqx_resource). diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 7c7990dbf..0eb42bdb8 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -50,6 +50,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + ok = emqx_authz:update(replace, []), emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]), meck:unload(emqx_resource). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 385c89965..afac20404 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -3,7 +3,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,replayq,emqtt]}, + {applications, [kernel,stdlib,replayq,emqtt,emqx]}, {mod, {emqx_bridge_mqtt_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 0b8717a0f..5e1ca2ca8 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -12,7 +12,8 @@ eredis, epgsql, mysql, - mongodb + mongodb, + emqx ]}, {env,[]}, {modules, []}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index 788677a33..425180af4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -3,7 +3,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, - {applications, [kernel,stdlib,mnesia,minirest]}, + {applications, [kernel,stdlib,mnesia,minirest,emqx]}, {mod, {emqx_dashboard_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src index 360511d9b..84486da19 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.app.src +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.app.src @@ -5,7 +5,8 @@ {mod, {emqx_data_bridge_app, []}}, {applications, [kernel, - stdlib + stdlib, + emqx ]}, {env,[]}, {modules, []}, diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index cca972eeb..c306a5ea4 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -4,7 +4,7 @@ {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, - {applications, [kernel,stdlib,grpc]}, + {applications, [kernel,stdlib,grpc,emqx]}, {env,[]}, {licenses, ["Apache-2.0"]}, {maintainers, ["EMQ X Team "]}, diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index e84ae0f7f..2fc329711 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.0"}, {registered, []}, {mod, {emqx_gateway_app, []}}, - {applications, [kernel, stdlib, grpc, lwm2m_coap]}, + {applications, [kernel, stdlib, grpc, lwm2m_coap, emqx]}, {env, []}, {modules, []}, {licenses, ["Apache 2.0"]}, diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf new file mode 100644 index 000000000..0797a9d70 --- /dev/null +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -0,0 +1,696 @@ +## NOTE: The configurations in this file will be overridden by +## `/data/emqx_overrides.conf` + +##================================================================== +## Node +##================================================================== +node { + ## Node name. + ## See: http://erlang.org/doc/reference_manual/distributed.html + ## + ## @doc node.name + ## ValueType: NodeName + ## Default: emqx@127.0.0.1 + name: "emqx@127.0.0.1" + + ## Cookie for distributed node communication. + ## + ## @doc node.cookie + ## ValueType: String + ## Default: emqxsecretcookie + cookie: emqxsecretcookie + + ## Data dir for the node + ## + ## @doc node.data_dir + ## ValueType: Folder + ## Default: "{{ platform_data_dir }}/" + data_dir: "{{ platform_data_dir }}/" + + ## Dir of crash dump file. + ## + ## @doc node.crash_dump_dir + ## ValueType: Folder + ## Default: "{{ platform_log_dir }}/" + crash_dump_dir: "{{ platform_log_dir }}/" + + ## Global GC Interval. + ## + ## @doc node.global_gc_interval + ## ValueType: Duration + ## Default: 15m + global_gc_interval: 15m + + ## Sets the net_kernel tick time in seconds. + ## Notice that all communicating nodes are to have the same + ## TickTime value specified. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime + ## + ## @doc node.dist_net_ticktime + ## ValueType: Number + ## Default: 2m + dist_net_ticktime: 2m + + ## Sets the port range for the listener socket of a distributed + ## Erlang node. + ## Note that if there are firewalls between clustered nodes, this + ## port segment for nodes’ communication should be allowed. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html + ## + ## @doc node.dist_listen_min + ## ValueType: Integer + ## Range: [1024,65535] + ## Default: 6369 + dist_listen_min: 6369 + + ## Sets the port range for the listener socket of a distributed + ## Erlang node. + ## Note that if there are firewalls between clustered nodes, this + ## port segment for nodes’ communication should be allowed. + ## + ## See: http://www.erlang.org/doc/man/kernel_app.html + ## + ## @doc node.dist_listen_max + ## ValueType: Integer + ## Range: [1024,65535] + ## Default: 6369 + dist_listen_max: 6369 + + ## Sets the maximum depth of call stack back-traces in the exit + ## reason element of 'EXIT' tuples. + ## The flag also limits the stacktrace depth returned by + ## process_info item current_stacktrace. + ## + ## @doc node.backtrace_depth + ## ValueType: Integer + ## Range: [0,1024] + ## Default: 23 + backtrace_depth: 23 + +} + +##================================================================== +## Cluster +##================================================================== +cluster { + ## Cluster name. + ## + ## @doc cluster.name + ## ValueType: String + ## Default: emqxcl + name: emqxcl + + ## Enable cluster autoheal from network partition. + ## + ## @doc cluster.autoheal + ## ValueType: Boolean + ## Default: true + autoheal: true + + ## Autoclean down node. A down node will be removed from the cluster + ## if this value > 0. + ## + ## @doc cluster.autoclean + ## ValueType: Duration + ## Default: 5m + autoclean: 5m + + ## Node discovery strategy to join the cluster. + ## + ## @doc cluster.discovery_strategy + ## ValueType: manual | static | mcast | dns | etcd | k8s + ## - manual: Manual join command + ## - static: Static node list + ## - mcast: IP Multicast + ## - dns: DNS A Record + ## - etcd: etcd + ## - k8s: Kubernetes + ## + ## Default: manual + discovery_strategy: manual + + ##---------------------------------------------------------------- + ## Cluster using static node list + ##---------------------------------------------------------------- + static { + ## Node list of the cluster + ## + ## @doc cluster.static.seeds + ## ValueType: Array + ## Default: [] + seeds: ["emqx1@127.0.0.1", "emqx2@127.0.0.1"] + } + + ##---------------------------------------------------------------- + ## Cluster using IP Multicast + ##---------------------------------------------------------------- + mcast { + ## IP Multicast Address. + ## + ## @doc cluster.mcast.addr + ## ValueType: IPAddress + ## Default: "239.192.0.1" + addr: "239.192.0.1" + + ## Multicast Ports. + ## + ## @doc cluster.mcast.ports + ## ValueType: Array + ## Default: [4369, 4370] + ports: [4369, 4370] + + ## Multicast Iface. + ## + ## @doc cluster.mcast.iface + ## ValueType: IPAddress + ## Default: "0.0.0.0" + iface: "0.0.0.0" + + ## Multicast Ttl. + ## + ## @doc cluster.mcast.ttl + ## ValueType: Integer + ## Range: [0,255] + ## Default: 255 + ttl: 255 + + ## Multicast loop. + ## + ## @doc cluster.mcast.loop + ## ValueType: Boolean + ## Default: true + loop: true + } + + ##---------------------------------------------------------------- + ## Cluster using DNS A records + ##---------------------------------------------------------------- + dns { + ## DNS name. + ## + ## @doc cluster.dns.name + ## ValueType: String + ## Default: localhost + name: localhost + + ## The App name is used to build 'node.name' with IP address. + ## + ## @doc cluster.dns.app + ## ValueType: String + ## Default: emqx + app: emqx + } + + ##---------------------------------------------------------------- + ## Cluster using etcd + ##---------------------------------------------------------------- + etcd { + ## Etcd server list, seperated by ','. + ## + ## @doc cluster.etcd.server + ## ValueType: URL + ## Required: true + server: "http://127.0.0.1:2379" + + ## The prefix helps build nodes path in etcd. Each node in the cluster + ## will create a path in etcd: v2/keys/// + ## + ## @doc cluster.etcd.prefix + ## ValueType: String + ## Default: emqxcl + prefix: emqxcl + + ## The TTL for node's path in etcd. + ## + ## @doc cluster.etcd.node_ttl + ## ValueType: Duration + ## Default: 1m + node_ttl: 1m + + ## Path to the file containing the user's private PEM-encoded key. + ## + ## @doc cluster.etcd.ssl.keyfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/key.pem" + ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem" + + ## Path to a file containing the user certificate. + ## + ## @doc cluster.etcd.ssl.certfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/cert.pem" + ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem" + + ## Path to the file containing PEM-encoded CA certificates. The CA certificates + ## are used during server authentication and when building the client certificate chain. + ## + ## @doc cluster.etcd.ssl.cacertfile + ## ValueType: File + ## Default: "{{ platform_etc_dir }}/certs/cacert.pem" + ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" + } + + ##---------------------------------------------------------------- + ## Cluster using Kubernetes + ##---------------------------------------------------------------- + k8s { + ## Kubernetes API server list, seperated by ','. + ## + ## @doc cluster.k8s.apiserver + ## ValueType: URL + ## Required: true + apiserver: "http://10.110.111.204:8080" + + ## The service name helps lookup EMQ nodes in the cluster. + ## + ## @doc cluster.k8s.service_name + ## ValueType: String + ## Default: emqx + service_name: emqx + + ## The address type is used to extract host from k8s service. + ## + ## @doc cluster.k8s.address_type + ## ValueType: ip | dns | hostname + ## Default: ip + address_type: ip + + ## The app name helps build 'node.name'. + ## + ## @doc cluster.k8s.app_name + ## ValueType: String + ## Default: emqx + app_name: emqx + + ## The suffix added to dns and hostname get from k8s service + ## + ## @doc cluster.k8s.suffix + ## ValueType: String + ## Default: "pod.local" + suffix: "pod.local" + + ## Kubernetes Namespace + ## + ## @doc cluster.k8s.namespace + ## ValueType: String + ## Default: default + namespace: default + } + + db_backend: mnesia + + rlog: { + # role: core + # core_nodes: [] + } + +} + +##================================================================== +## Log +##================================================================== +log { + ## The primary log level + ## + ## - all the log messages with levels lower than this level will + ## be dropped. + ## - all the log messages with levels higher than this level will + ## go into the log handlers. The handlers then decide to log it + ## out or drop it according to the level setting of the handler. + ## + ## Note: Only the messages with severity level higher than or + ## equal to this level will be logged. + ## + ## @doc log.primary_level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + primary_level: warning + + ##---------------------------------------------------------------- + ## The console log handler send log messages to emqx console + ##---------------------------------------------------------------- + ## Log to single line + ## @doc log.console_handler.enable + ## ValueType: Boolean + ## Default: false + console_handler.enable: false + + ## The log level of this handler + ## All the log messages with levels lower than this level will + ## be dropped. + ## + ## @doc log.console_handler.level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + console_handler.level: warning + + ##---------------------------------------------------------------- + ## The file log handlers send log messages to files + ##---------------------------------------------------------------- + ## file_handlers. + file_handlers.emqx_log: { + ## The log level filter of this handler + ## All the log messages with levels lower than this level will + ## be dropped. + ## + ## @doc log.file_handlers..level + ## ValueType: debug | info | notice | warning | error | critical | alert | emergency + ## Default: warning + level: warning + + ## The log file for specified level. + ## + ## If `rotation` is disabled, this is the file of the log files. + ## + ## If `rotation` is enabled, this is the base name of the files. + ## Each file in a rotated log is named .N, where N is an integer. + ## + ## Note: Log files for a specific log level will only contain all the logs + ## that higher than or equal to that level + ## + ## @doc log.file_handlers..file + ## ValueType: File + ## Required: true + file: "{{ platform_log_dir }}/emqx.log" + + ## Enables the log rotation. + ## With this enabled, new log files will be created when the current + ## log file is full, max to `rotation_count` files will be created. + ## + ## @doc log.file_handlers..rotation.enable + ## ValueType: Boolean + ## Default: true + rotation.enable: true + + ## Maximum rotation count of log files. + ## + ## @doc log.file_handlers..rotation.count + ## ValueType: Integer + ## Range: [1, 2048] + ## Default: 10 + rotation.count: 10 + + ## Maximum size of each log file. + ## + ## If the max_size reached and `rotation` is disabled, the handler + ## will stop sending log messages, if the `rotation` is enabled, + ## the file rotates. + ## + ## @doc log.file_handlers..max_size + ## ValueType: Size | infinity + ## Default: 10MB + max_size: 10MB + } + + ## file_handlers. + ## + ## You could also create multiple file handlers for different + ## log level for example: + file_handlers.emqx_error_log: { + level: error + file: "{{ platform_log_dir }}/error.log" + } + + ## Timezone offset to display in logs + ## + ## @doc log.time_offset + ## ValueType: system | utc | String + ## - "system" use system zone + ## - "utc" for Universal Coordinated Time (UTC) + ## - "+hh:mm" or "-hh:mm" for a specified offset + ## Default: system + time_offset: system + + ## Limits the total number of characters printed for each log event. + ## + ## @doc log.chars_limit + ## ValueType: Integer | infinity + ## Range: [0, infinity) + ## Default: infinity + chars_limit: infinity + + ## Maximum depth for Erlang term log formatting + ## and Erlang process message queue inspection. + ## + ## @doc log.max_depth + ## ValueType: Integer | infinity + ## Default: 80 + max_depth: 80 + + ## Log formatter + ## @doc log.formatter + ## ValueType: text | json + ## Default: text + formatter: text + + ## Log to single line + ## @doc log.single_line + ## ValueType: Boolean + ## Default: true + single_line: true + + ## The max allowed queue length before switching to sync mode. + ## + ## Log overload protection parameter. If the message queue grows + ## larger than this value the handler switches from anync to sync mode. + ## + ## @doc log.sync_mode_qlen + ## ValueType: Integer + ## Range: [0, ${log.drop_mode_qlen}] + ## Default: 100 + sync_mode_qlen: 100 + + ## The max allowed queue length before switching to drop mode. + ## + ## Log overload protection parameter. When the message queue grows + ## larger than this threshold, the handler switches to a mode in which + ## it drops all new events that senders want to log. + ## + ## @doc log.drop_mode_qlen + ## ValueType: Integer + ## Range: [${log.sync_mode_qlen}, ${log.flush_qlen}] + ## Default: 3000 + drop_mode_qlen: 3000 + + ## The max allowed queue length before switching to flush mode. + ## + ## Log overload protection parameter. If the length of the message queue + ## grows larger than this threshold, a flush (delete) operation takes place. + ## To flush events, the handler discards the messages in the message queue + ## by receiving them in a loop without logging. + ## + ## @doc log.flush_qlen + ## ValueType: Integer + ## Range: [${log.drop_mode_qlen}, infinity) + ## Default: 8000 + flush_qlen: 8000 + + ## Kill the log handler when it gets overloaded. + ## + ## Log overload protection parameter. It is possible that a handler, + ## even if it can successfully manage peaks of high load without crashing, + ## can build up a large message queue, or use a large amount of memory. + ## We could kill the log handler in these cases and restart it after a + ## few seconds. + ## + ## @doc log.overload_kill.enable + ## ValueType: Boolean + ## Default: true + overload_kill.enable: true + + ## The max allowed queue length before killing the log hanlder. + ## + ## Log overload protection parameter. This is the maximum allowed queue + ## length. If the message queue grows larger than this, the handler + ## process is terminated. + ## + ## @doc log.overload_kill.qlen + ## ValueType: Integer + ## Range: [0, 1048576] + ## Default: 20000 + overload_kill.qlen: 20000 + + ## The max allowed memory size before killing the log hanlder. + ## + ## Log overload protection parameter. This is the maximum memory size + ## that the handler process is allowed to use. If the handler grows + ## larger than this, the process is terminated. + ## + ## @doc log.overload_kill.mem_size + ## ValueType: Size + ## Default: 30MB + overload_kill.mem_size: 30MB + + ## Restart the log hanlder after some seconds. + ## + ## Log overload protection parameter. If the handler is terminated, + ## it restarts automatically after a delay specified in seconds. + ## + ## @doc log.overload_kill.restart_after + ## ValueType: Duration + ## Default: 5s + overload_kill.restart_after: 5s + + ## Controlling Bursts of Log Requests. + ## + ## Log overload protection parameter. Large bursts of log events - many + ## events received by the handler under a short period of time - can + ## potentially cause problems. By specifying the maximum number of events + ## to be handled within a certain time frame, the handler can avoid + ## choking the log with massive amounts of printouts. + ## + ## Note that there would be no warning if any messages were + ## dropped because of burst control. + ## + ## @doc log.burst_limit.enable + ## ValueType: Boolean + ## Default: false + burst_limit.enable: false + + ## This config controls the maximum number of events to handle within + ## a time frame. After the limit is reached, successive events are + ## dropped until the end of the time frame defined by `window_time`. + ## + ## @doc log.burst_limit.max_count + ## ValueType: Integer + ## Default: 10000 + burst_limit.max_count: 10000 + + ## See the previous description of burst_limit_max_count. + ## + ## @doc log.burst_limit.window_time + ## ValueType: duration + ## Default: 1s + burst_limit.window_time: 1s +} + +##================================================================== +## RPC +##================================================================== +rpc { + ## RPC Mode. + ## + ## @doc rpc.mode + ## ValueType: sync | async + ## Default: async + mode: async + + ## Max batch size of async RPC requests. + ## + ## NOTE: RPC batch won't work when rpc.mode = sync + ## Zero value disables rpc batching. + ## + ## @doc rpc.async_batch_size + ## ValueType: Integer + ## Range: [0, 1048576] + ## Default: 0 + async_batch_size: 256 + + ## RPC port discovery + ## + ## The strategy for discovering the RPC listening port of + ## other nodes. + ## + ## @doc cluster.discovery_strategy + ## ValueType: manual | stateless + ## - manual: discover ports by `tcp_server_port`. + ## - stateless: discover ports in a stateless manner. + ## If node name is `emqx@127.0.0.1`, where the `` is + ## an integer, then the listening port will be `5370 + ` + ## + ## Default: `stateless`. + port_discovery: stateless + + ## TCP server port for RPC. + ## + ## Only takes effect when `rpc.port_discovery` = `manual`. + ## + ## @doc rpc.tcp_server_port + ## ValueType: Integer + ## Range: [1024-65535] + ## Defaults: 5369 + tcp_server_port: 5369 + + ## Number of outgoing RPC connections. + ## + ## Set this to 1 to keep the message order sent from the same + ## client. + ## + ## @doc rpc.tcp_client_num + ## ValueType: Integer + ## Range: [1, 256] + ## Defaults: 1 + tcp_client_num: 1 + + ## RCP Client connect timeout. + ## + ## @doc rpc.connect_timeout + ## ValueType: Duration + ## Default: 5s + connect_timeout: 5s + + ## TCP send timeout of RPC client and server. + ## + ## @doc rpc.send_timeout + ## ValueType: Duration + ## Default: 5s + send_timeout: 5s + + ## Authentication timeout + ## + ## @doc rpc.authentication_timeout + ## ValueType: Duration + ## Default: 5s + authentication_timeout: 5s + + ## Default receive timeout for call() functions + ## + ## @doc rpc.call_receive_timeout + ## ValueType: Duration + ## Default: 15s + call_receive_timeout: 15s + + ## Socket idle keepalive. + ## + ## @doc rpc.socket_keepalive_idle + ## ValueType: Duration + ## Default: 900s + socket_keepalive_idle: 900s + + ## TCP Keepalive probes interval. + ## + ## @doc rpc.socket_keepalive_interval + ## ValueType: Duration + ## Default: 75s + socket_keepalive_interval: 75s + + ## Probes lost to close the connection + ## + ## @doc rpc.socket_keepalive_count + ## ValueType: Integer + ## Default: 9 + socket_keepalive_count: 9 + + ## Size of TCP send buffer. + ## + ## @doc rpc.socket_sndbuf + ## ValueType: Size + ## Default: 1MB + socket_sndbuf: 1MB + + ## Size of TCP receive buffer. + ## + ## @doc rpc.socket_recbuf + ## ValueType: Size + ## Default: 1MB + socket_recbuf: 1MB + + ## Size of user-level software socket buffer. + ## + ## @doc rpc.socket_buffer + ## ValueType: Size + ## Default: 1MB + socket_buffer: 1MB +} diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src new file mode 100644 index 000000000..21da4a4c8 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -0,0 +1,15 @@ +{application, emqx_machine, + [{id, "emqx_machine"}, + {description, "The EMQ X Machine"}, + {vsn, "0.1.0"}, % strict semver, bump manually! + {modules, []}, + {registered, []}, + {applications, [kernel,stdlib]}, + {mod, {emqx_machine_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} +]}. diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl new file mode 100644 index 000000000..d07a63d81 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -0,0 +1,176 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_app). + +-export([ start/2 + , stop/1 + , prep_stop/1 + ]). + +%% Shutdown and reboot +-export([ shutdown/1 + , ensure_apps_started/0 + ]). + +-export([sorted_reboot_apps/0]). + +-behaviour(application). + +-include_lib("emqx/include/logger.hrl"). + +start(_Type, _Args) -> + ok = set_backtrace_depth(), + ok = print_otp_version_warning(), + + %% need to load some app envs + %% TODO delete it once emqx boot does not depend on modules envs + _ = load_modules(), + ok = load_config_files(), + + {ok, RootSupPid} = emqx_machine_sup:start_link(), + + ok = ensure_apps_started(), + + _ = emqx_plugins:load(), + + ok = print_vsn(), + + ok = start_autocluster(), + {ok, RootSupPid}. + +prep_stop(_State) -> + application:stop(emqx). + +stop(_State) -> + ok. + +set_backtrace_depth() -> + {ok, Depth} = application:get_env(emqx_machine, backtrace_depth), + _ = erlang:system_flag(backtrace_depth, Depth), + ok. + +-if(?OTP_RELEASE > 22). +print_otp_version_warning() -> ok. +-else. +print_otp_version_warning() -> + ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", + [?OTP_RELEASE]). +-endif. % OTP_RELEASE > 22 + +-ifdef(TEST). +print_vsn() -> ok. +-else. % TEST +print_vsn() -> + ?ULOG("~s ~s is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). +-endif. % TEST + +-ifndef(EMQX_ENTERPRISE). +load_modules() -> + application:load(emqx_modules). +-else. +load_modules() -> + ok. +-endif. + +load_config_files() -> + %% the app env 'config_files' for 'emqx` app should be set + %% in app.time.config by boot script before starting Erlang VM + ConfFiles = application:get_env(emqx, config_files, []), + %% emqx_machine_schema is a superset of emqx_schema + ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), + %% to avoid config being loaded again when emqx app starts. + ok = emqx_app:set_init_config_load_done(). + +start_autocluster() -> + ekka:callback(prepare, fun ?MODULE:shutdown/1), + ekka:callback(reboot, fun ?MODULE:ensure_apps_started/0), + _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec + ok. + +shutdown(Reason) -> + ?SLOG(critical, #{msg => "stopping_apps", reason => Reason}), + _ = emqx_alarm_handler:unload(), + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). + +stop_one_app(App) -> + ?SLOG(debug, #{msg => "stopping_app", app => App}), + application:stop(App). + +ensure_apps_started() -> + lists:foreach(fun start_one_app/1, sorted_reboot_apps()). + +start_one_app(App) -> + ?SLOG(debug, #{msg => "starting_app", app => App}), + case application:ensure_all_started(App) of + {ok, Apps} -> + ?SLOG(debug, #{msg => "started_apps", apps => [App | Apps]}); + {error, Reason} -> + ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), + error({faile_to_start_app, App, Reason}) + end. + +%% list of app names which should be rebooted when: +%% 1. due to static static config change +%% 2. after join a cluster +reboot_apps() -> + [gproc, esockd, ranch, cowboy, ekka, quicer, emqx | ?EMQX_DEP_APPS]. + +%% quicer can not be added to emqx's .app because it might be opted out at build time +implicit_deps() -> + [{emqx, [quicer]}]. + +sorted_reboot_apps() -> + Apps = [{App, app_deps(App)} || App <- reboot_apps()], + sorted_reboot_apps(Apps ++ implicit_deps()). + +app_deps(App) -> + case application:get_key(App, applications) of + undefined -> []; + {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) + end. + +sorted_reboot_apps(Apps) -> + G = digraph:new(), + lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), + case digraph_utils:topsort(G) of + Sorted when is_list(Sorted) -> + Sorted; + false -> + Loops = find_loops(G), + error({circular_application_dependency, Loops}) + end. + +add_app(G, App, undefined) -> + ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), + %% not loaded + add_app(G, App, []); +add_app(_G, _App, []) -> + ok; +add_app(G, App, [Dep | Deps]) -> + digraph:add_vertex(G, App), + digraph:add_vertex(G, Dep), + digraph:add_edge(G, Dep, App), %% dep -> app as dependency + add_app(G, App, Deps). + +find_loops(G) -> + lists:filtermap( + fun (App) -> + case digraph:get_short_cycle(G, App) of + false -> false; + Apps -> {true, Apps} + end + end, digraph:vertices(G)). diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl new file mode 100644 index 000000000..7e3843081 --- /dev/null +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -0,0 +1,426 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_machine_schema). + +-dialyzer(no_return). +-dialyzer(no_match). +-dialyzer(no_contracts). +-dialyzer(no_unused). +-dialyzer(no_fail_call). + +-include_lib("typerefl/include/types.hrl"). + +-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. +-type file() :: string(). +-type cipher() :: map(). + +-behaviour(hocon_schema). + +-reflect_type([ log_level/0, + file/0, + cipher/0]). + +-export([structs/0, fields/1, translations/0, translation/1]). +-export([t/1, t/3, t/4, ref/1]). +-export([conf_get/2, conf_get/3, keys/2, filter/1]). + +%% Static apps which merge their configs into the merged emqx.conf +%% The list can not be made a dynamic read at run-time as it is used +%% by nodetool to generate app.