diff --git a/Makefile b/Makefile index e7bab43ba..5e5d456c9 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1 dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3 -dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0 +dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1 dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 NO_AUTOPATCH = cuttlefish diff --git a/etc/emqx.conf b/etc/emqx.conf index cb2d0bb00..15ad24dd1 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -114,6 +114,22 @@ cluster.autoclean = 5m ## Default: 1m, 1 minute ## cluster.etcd.node_ttl = 1m +## Path to a file containing the client's private PEM-encoded key. +## +## Value: File +## cluster.etcd.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem + +## The path to a file containing the client's certificate. +## +## Value: File +## cluster.etcd.ssl.certfile = {{ platform_etc_dir }}/certs/client.pem + +## Path to the file containing PEM-encoded CA certificates. The CA certificates +## are used during server authentication and when building the client certificate chain. +## +## Value: File +## cluster.etcd.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem + ##-------------------------------------------------------------------- ## Cluster using Kubernates @@ -142,12 +158,6 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.namespace = default -## Kubernates Namespace -## -## Value: String -## cluster.k8s.namespace = default - - ##-------------------------------------------------------------------- ## Node ##-------------------------------------------------------------------- @@ -1261,7 +1271,7 @@ listener.ws.external.zone = external ## The access control for the MQTT/WebSocket listener. ## -## See: listener.tcp.$name.access +## See: listener.ws.$name.access ## ## Value: ACL Rule listener.ws.external.access.1 = allow all @@ -1286,74 +1296,143 @@ listener.ws.external.verify_protocol_header = on ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## -## See: listener.tcp.$name.proxy_protocol +## See: listener.ws.$name.proxy_protocol ## ## Value: on | off ## listener.ws.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp.$name.proxy_protocol_timeout +## See: listener.ws.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.ws.external.proxy_protocol_timeout = 3s ## The TCP backlog of external MQTT/WebSocket Listener. ## -## See: listener.tcp.$name.backlog +## See: listener.ws.$name.backlog ## ## Value: Number >= 0 listener.ws.external.backlog = 1024 ## The TCP send timeout for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.send_timeout +## See: listener.ws.$name.send_timeout ## ## Value: Duration listener.ws.external.send_timeout = 15s ## Close the MQTT/WebSocket connection if send timeout. ## -## See: listener.tcp.$name.send_timeout_close +## See: listener.ws.$name.send_timeout_close ## ## Value: on | off listener.ws.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.recbuf +## See: listener.ws.$name.recbuf ## ## Value: Bytes ## listener.ws.external.recbuf = 2KB ## The TCP send buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.sndbuf +## See: listener.ws.$name.sndbuf ## ## Value: Bytes ## listener.ws.external.sndbuf = 2KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp.$name.buffer +## See: listener.ws.$name.buffer ## ## Value: Bytes ## listener.ws.external.buffer = 2KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp.$name.tune_buffer +## See: listener.ws.$name.tune_buffer ## ## Value: on | off ## listener.ws.external.tune_buffer = off ## The TCP_NODELAY flag for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.nodelay +## See: listener.ws.$name.nodelay ## ## Value: true | false listener.ws.external.nodelay = true +## The compress flag for external MQTT/WebSocket connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.ws.external.compress = true + +## The level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.ws.external.deflate_opts.level = default + +## The mem_level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.ws.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.ws.external.deflate_opts.strategy = default + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.idle_timeout +## +## Value: Duration +## listener.ws.external.idle_timeout = 60s + +## The max frame size for external MQTT/WebSocket connections. +## +## +## Value: Number +## listener.ws.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol @@ -1486,7 +1565,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## See: listener.ssl.$name.secure_renegotiate ## @@ -1557,6 +1636,74 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true +## The compress flag for external WebSocket/SSL connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.wss.external.compress = true + +## The level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.wss.external.deflate_opts.level = default + +## The mem_level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.wss.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.wss.external.deflate_opts.strategy = default + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external WebSocket/SSL connections. +## +## See: listener.wss.$name.idle_timeout +## +## Value: Duration +## listener.wss.external.idle_timeout = 60s + +## The max frame size for external WebSocket/SSL connections. +## +## Value: Number +## listener.wss.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## Bridges ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 25589f138..e6944ba79 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -105,6 +105,18 @@ {default, "1m"} ]}. +{mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% Cluster on K8s @@ -149,9 +161,16 @@ [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; (etcd) -> + SslOpts = fun(Conf) -> + Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), + lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) -> + {list_to_atom(Name), Value} + end, Options) + end, [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, - {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}]; + {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}, + {ssl_options, SslOpts(Conf)}]; (k8s) -> [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, @@ -1238,6 +1257,57 @@ end}. hidden ]}. +{mapping, "listener.ws.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners @@ -1393,6 +1463,61 @@ end}. {datatype, {enum, [cn, dn, crt]}} ]}. +{mapping, "listener.wss.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + + + {translation, "emqx.listeners", fun(Conf) -> Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -1431,19 +1556,30 @@ end}. {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, + {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, + {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) end, + DeflateOpts = fun(Prefix) -> + Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, + {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)}, + {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)}, + {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)}, + {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)}, + {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)}, + {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}]) + end, TcpOpts = fun(Prefix) -> - Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, - {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, - {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, - {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, - {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, - {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, - {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, MapPSKCiphers = fun(PSKCiphers) -> lists:map( @@ -1496,7 +1632,8 @@ end}. case cuttlefish:conf_get(Prefix, Conf, undefined) of undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1506,7 +1643,8 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}, + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)}, {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1987,4 +2125,4 @@ end}. [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] -end}. +end}. \ No newline at end of file diff --git a/rebar.config b/rebar.config index 2e16b86b9..005331120 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {cowboy, "2.4.0"}, + {cowboy, "2.6.1"}, {meck, "0.8.13"} %% temp workaround for version check ]}. @@ -28,4 +28,3 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. - diff --git a/src/emqx.erl b/src/emqx.erl index 44dd41f7e..59725b52e 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -15,6 +15,7 @@ -module(emqx). -include("emqx.hrl"). +-include("logger.hrl"). -include("types.hrl"). %% Start/Stop the application @@ -180,7 +181,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - emqx_logger:error("emqx shutdown for ~s", [Reason]), + ?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]), emqx_alarm_handler:unload(), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 6c7d1a470..e7d74eee8 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -25,7 +25,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% gen_server callbacks +%% gen_event callbacks -export([ init/1 , handle_event/2 , handle_call/2 @@ -93,17 +93,17 @@ init(_) -> handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) -> handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State); handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> - ?LOG(notice, "Alarm report: set ~p", [Alarm]), + ?LOG(warning, "[Alarm Handler] ~p set", [Alarm]), case encode_alarm(Alarm) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json)); {error, Reason} -> - ?LOG(error, "Failed to encode alarm: ~p", [Reason]) + ?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason]) end, set_alarm_(AlarmId, AlarmDesc), {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(notice, "Alarm report: clear ~p", [AlarmId]), + ?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]), emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)), clear_alarm_(AlarmId), {ok, State}; diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 9e7c2d4ae..ab21bce50 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -88,11 +88,11 @@ init([]) -> {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - ?ERROR("[Banned] unexpected call: ~p", [Req]), + ?LOG(error, "[Banned] unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[Banned] unexpected msg: ~p", [Msg]), + ?LOG(error, "[Banned] unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> @@ -100,7 +100,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - ?ERROR("[Banned] unexpected info: ~p", [Info]), + ?LOG(error, "[Banned] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 5bd865ccf..740f52b98 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -298,10 +298,8 @@ standing_by({call, From}, ensure_started, State) -> [{reply, From, ok}]}; standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; -standing_by({call, From}, _Call, _State) -> - {keep_state_and_data, [{reply, From, {error,standing_by}}]}; standing_by(info, Info, State) -> - ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), + ?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]), {keep_state_and_data, State}; standing_by(Type, Content, State) -> common(standing_by, Type, Content, State). @@ -320,7 +318,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, ok = subscribe_local_topics(Forwards), case ConnectFun(Subs) of {ok, ConnRef, Conn} -> - ?LOG(info, "Bridge ~p connected", [name()]), + ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; error -> @@ -370,7 +368,7 @@ connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRefCurrent} = State) -> case ConnRefCurrent =:= ConnRef of true -> - ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]), + ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]), {next_state, connecting, State#{conn_ref := undefined, connection := undefined}}; false -> @@ -382,7 +380,7 @@ connected(info, {batch_ack, Ref}, State) -> keep_state_and_data; bad_order -> %% try re-connect then re-send - ?LOG(error, "Bad order ack received by bridge ~p", [name()]), + ?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]), {next_state, connecting, disconnect(State)}; {ok, NewState} -> {keep_state, NewState, ?maybe_send} @@ -413,7 +411,7 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p", + ?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p", [name(), Type, StateName, Content]), {keep_state, State}. @@ -484,7 +482,7 @@ retry_inflight(#{inflight := Inflight} = State, {ok, NewState} -> retry_inflight(NewState, T); {error, Reason} -> - ?LOG(error, "Inflight retry failed\n~p", [Reason]), + ?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]), {error, State#{inflight := Inflight ++ Remain}} end. @@ -515,7 +513,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> batch => Batch}], {ok, State#{inflight := NewInflight}}; {error, Reason} -> - ?LOG(info, "Batch produce failed\n~p", [Reason]), + ?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]), {error, State} end. @@ -574,4 +572,3 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). id(Pid) when is_pid(Pid) -> Pid; id(Name) -> name(Name). - diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 49fa2b478..37231ca88 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -54,7 +54,7 @@ start(Module, Config) -> {ok, Ref, Conn}; {error, Reason} -> Config1 = obfuscate(Config), - ?LOG(error, "Failed to connect with module=~p\n" + ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), error end. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 33fc24056..5f66e5810 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -195,7 +195,7 @@ publish(Msg) when is_record(Msg, message) -> Headers = Msg#message.headers, case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of #message{headers = #{allow_publish := false}} -> - ?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]), + ?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]), []; #message{topic = Topic} = Msg1 -> Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), @@ -209,7 +209,7 @@ safe_publish(Msg) when is_record(Msg, message) -> publish(Msg) catch _:Error:Stacktrace -> - ?ERROR("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) + ?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace]) after ok end. @@ -256,7 +256,7 @@ forward(Node, To, Delivery) -> %% rpc:call to ensure the delivery, but the latency:( case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?ERROR("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]), + ?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]), Delivery; Delivery1 -> Delivery1 end. @@ -424,14 +424,14 @@ handle_call({subscribe, Topic, I}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?ERROR("[Broker] unexpected call: ~p", [Req]), + ?LOG(error, "[Broker] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({subscribe, Topic}, State) -> case emqx_router:do_add_route(Topic) of ok -> ok; {error, Reason} -> - ?ERROR("[Broker] Failed to add route: ~p", [Reason]) + ?LOG(error, "[Broker] Failed to add route: ~p", [Reason]) end, {noreply, State}; @@ -454,11 +454,11 @@ handle_cast({unsubscribed, Topic, I}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?ERROR("[Broker] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[Broker] unexpected info: ~p", [Info]), + ?LOG(error, "[Broker] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index cd92ec89b..8ece1805b 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -110,7 +110,7 @@ init([]) -> {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?ERROR("[BrokerHelper] unexpected call: ~p", [Req]), + ?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> @@ -119,7 +119,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> - ?ERROR("[BrokerHelper] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> @@ -130,7 +130,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) {noreply, State#{pmon := PMon1}}; handle_info(Info, State) -> - ?ERROR("[BrokerHelper] unexpected info: ~p", [Info]), + ?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 73622a2a6..75b384fcb 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -16,6 +16,7 @@ -behaviour(gen_statem). +-include("logger.hrl"). -include("types.hrl"). -include("emqx_client.hrl"). @@ -786,10 +787,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) - Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), State#state{inflight = Inflight1}; {value, {pubrel, _Ref, _Ts}} -> - emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]), + ?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]), State; none -> - emqx_logger:warning("Unexpected PUBREC Packet: ~p", [PacketId]), + ?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]), State end); @@ -804,7 +805,7 @@ connected(cast, ?PUBREL_PACKET(PacketId), false -> {keep_state, NewState} end; error -> - emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]), + ?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]), keep_state_and_data end; @@ -903,33 +904,33 @@ handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> - emqx_logger:debug("RECV Data: ~p", [Data]), + ?LOG(debug, "[Client] RECV Data: ~p", [Data]), process_incoming(Data, [], run_sock(State)); handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]), + ?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - emqx_logger:debug("[~p] ~p", [?MODULE, Closed]), + ?LOG(debug, "[Client] ~p", [Closed]), {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> - emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]), + ?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> keep_state_and_data; handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> - emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]), + ?LOG(error, "[Client] Got tcp error: ~p", [Reason]), {stop, {shutdown, Reason}, State}; handle_event(EventType, EventContent, StateName, _StateData) -> - emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)", - [StateName, EventType, EventContent]), + ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)", + [StateName, EventType, EventContent]), keep_state_and_data. %% Mandatory callback functions @@ -971,7 +972,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), + ?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]), State end; delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), @@ -983,7 +984,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), properties => Properties}), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), + ?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]), State end. @@ -1186,7 +1187,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), - emqx_logger:debug("SEND Data: ~1000p", [Packet]), + ?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]), case emqx_client_sock:send(Sock, Data) of ok -> {ok, bump_last_packet_id(State)}; Error -> Error diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 37642a17f..a915fdddb 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -159,7 +159,7 @@ init([]) -> {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> - ?ERROR("[CM] unexpected call: ~p", [Req]), + ?LOG(error, "[CM] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) -> @@ -169,7 +169,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) -> {noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}}; handle_cast(Msg, State) -> - ?ERROR("[CM] unexpected cast: ~p", [Msg]), + ?LOG(error, "[CM] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) -> @@ -180,7 +180,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon} {noreply, State#{conn_pmon := PMon1}}; handle_info(Info, State) -> - ?ERROR("[CM] unexpected info: ~p", [Info]), + ?LOG(error, "[CM] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 5e5d758f8..e9cfd6ae4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -286,18 +286,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) -> reply(From, emqx_protocol:session(ProtoState), State); handle({call, From}, Req, State) -> - ?LOG(error, "unexpected call: ~p", [Req]), + ?LOG(error, "[Connection] Unexpected call: ~p", [Req]), reply(From, ignored, State); %% Handle cast handle(cast, Msg, State) -> - ?LOG(error, "unexpected cast: ~p", [Msg]), + ?LOG(error, "[Connection] Unexpected cast: ~p", [Msg]), {keep_state, State}; %% Handle Incoming handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), - ?LOG(debug, "RECV ~p", [Data]), + ?LOG(debug, "[Connection] RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), emqx_metrics:trans(inc, 'bytes/received', Oct), NState = ensure_stats_timer(maybe_gc({1, Oct}, State)), @@ -345,23 +345,23 @@ handle(info, {timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {keep_state, NState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(warning, "shutdown due to ~p", [Reason]), + ?LOG(error, "[Connection] Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NState) end; handle(info, {shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(error, "[Connection] Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "[Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); handle(info, Info, State) -> - ?LOG(error, "unexpected info: ~p", [Info]), + ?LOG(error, "[Connection] Unexpected info: ~p", [Info]), {keep_state, State}. code_change(_Vsn, State, Data, _Extra) -> @@ -371,7 +371,7 @@ terminate(Reason, _StateName, #state{transport = Transport, socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> - ?LOG(debug, "Terminated for ~p", [Reason]), + ?LOG(debug, "[Connection] Terminated for ~p", [Reason]), Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -398,7 +398,7 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> shutdown(Reason, State) catch _:Error:Stk-> - ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Error, Stk, Data]), + ?LOG(error, "[Connection] Parse failed for ~p~nStacktrace:~p~nError data:~p", [Error, Stk, Data]), shutdown(Error, State) end. diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 55be098ad..4746175b3 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -107,14 +107,14 @@ init([]) -> {ok, #state{seq = 0}}. handle_call(Req, _From, State) -> - ?ERROR("[Ctl] unexpected call: ~p", [Req]), + ?LOG(error, "[Ctl] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) -> case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?WARN("[Ctl] cmd ~s is overidden by ~p", [Cmd, MF]), + ?LOG(warning, "[Ctl] CMD ~s is overidden by ~p", [Cmd, MF]), ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts}) end, noreply(next_seq(State)); @@ -124,11 +124,11 @@ handle_cast({unregister_command, Cmd}, State) -> noreply(State); handle_cast(Msg, State) -> - ?ERROR("[Ctl] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Ctl] Unexpected cast: ~p", [Msg]), noreply(State). handle_info(Info, State) -> - ?ERROR("[Ctl] unexpected info: ~p", [Info]), + ?LOG(error, "[Ctl] Unexpected info: ~p", [Info]), noreply(State). terminate(_Reason, _State) -> diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 4f8e674ba..6d448d2f3 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -181,7 +181,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat {reply, Reply, State}; handle_call(Req, _From, State) -> - ?ERROR("[Hooks] unexpected call: ~p", [Req]), + ?LOG(error, "[Hooks] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({del, HookPoint, Action}, State) -> @@ -194,11 +194,11 @@ handle_cast({del, HookPoint, Action}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?ERROR("[Hooks] unexpected msg: ~p", [Msg]), + ?LOG(error, "[Hooks] Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[Hooks] unexpected info: ~p", [Info]), + ?LOG(error, "[Hooks] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 20be60602..104a9583a 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -64,25 +64,27 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), - start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); + start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), ws_opts(Options)); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), - start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). + start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), ws_opts(Options)). start_mqtt_listener(Name, ListenOn, Options) -> SockOpts = esockd:parse_opt(Options), esockd:open(Name, ListenOn, merge_default(SockOpts), {emqx_connection, start_link, [Options -- SockOpts]}). -start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> - Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) -> + Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts). mqtt_path(Options) -> proplists:get_value(mqtt_path, Options, "/mqtt"). +ws_opts(Options) -> + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), + #{env => #{dispatch => Dispatch}, proxy_header => proplists:get_value(proxy_protocol, Options, false)}. + ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), @@ -163,4 +165,3 @@ format({Addr, Port}) when is_list(Addr) -> io_lib:format("~s:~w", [Addr, Port]); format({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [esockd_net:ntoab(Addr), Port]). - diff --git a/src/emqx_message.erl b/src/emqx_message.erl index f7cab7f6c..d4f490ed7 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -174,7 +174,7 @@ to_list(Msg) -> to_bin_key_list(Msg) -> lists:zipwith( fun(Key, Val) -> - {bin(Key), Val} + {bin(Key), bin_key_map(Val)} end, record_info(fields, message), tl(tuple_to_list(Msg))). %% MilliSeconds @@ -192,6 +192,13 @@ format(flags, Flags) -> format(headers, Headers) -> io_lib:format("~p", [Headers]). +bin_key_map(Map) when is_map(Map) -> + maps:fold(fun(Key, Val, Acc) -> + Acc#{bin(Key) => bin_key_map(Val)} + end, #{}, Map); +bin_key_map(Data) -> + Data. + bin(Bin) when is_binary(Bin) -> Bin; bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom)); -bin(Str) when is_list(Str) -> list_to_binary(Str). \ No newline at end of file +bin(Str) when is_list(Str) -> list_to_binary(Str). diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 827324bf4..aeb94019f 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -318,15 +318,15 @@ init([]) -> {ok, #{}, hibernate}. handle_call(Req, _From, State) -> - ?ERROR("[Metrics] unexpected call: ~p", [Req]), + ?LOG(error, "[Metrics] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[Metrics] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Metrics] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[Metrics] unexpected info: ~p", [Info]), + ?LOG(error, "[Metrics] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{}) -> diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index a09d5645d..953666f31 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -99,7 +99,7 @@ rules_from_file(AclFile) -> #{publish => [Rule || Rule <- Rules, filter(publish, Rule)], subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]}; {error, Reason} -> - ?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), + ?LOG(alert, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), #{} end. diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 1d45fee2e..d2ce98abc 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -17,6 +17,7 @@ -behaviour(emqx_gen_mod). -include("emqx.hrl"). +-include("logger.hrl"). %% APIs -export([ on_client_connected/4 @@ -50,7 +51,7 @@ on_client_connected(#{client_id := ClientId, {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> - emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) + ?LOG(error, "[Presence] Encoding connected event error: ~p", [Reason]) end. on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> @@ -61,7 +62,7 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E {ok, Payload} -> emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload)); {error, Reason} -> - emqx_logger:error("[Presence Module] Json error: ~p", [Reason]) + ?LOG(error, "[Presence] Encoding disconnected event error: ~p", [Reason]) end. unload(_Env) -> diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 93110cafe..f9d74dc81 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -14,6 +14,8 @@ -module(emqx_modules). +-include("logger.hrl"). + -export([ load/0 , unload/0 ]). @@ -24,7 +26,7 @@ load() -> lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), - logger:info("Load ~s module successfully.", [Mod]) + ?LOG(info, "[Modules] Load ~s module successfully.", [Mod]) end, emqx_config:get_env(modules, [])). -spec(unload() -> ok). diff --git a/src/emqx_mountpoint.erl b/src/emqx_mountpoint.erl index ef5d28e4d..b98348395 100644 --- a/src/emqx_mountpoint.erl +++ b/src/emqx_mountpoint.erl @@ -46,7 +46,7 @@ unmount(MountPoint, Msg = #message{topic = Topic}) -> {MountPoint, Topic1} -> Msg#message{topic = Topic1} catch _Error:Reason -> - ?LOG(error, "Unmount error : ~p", [Reason]), + ?LOG(error, "[Mountpoint] Unmount error : ~p", [Reason]), Msg end. diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index f8d8f41e1..2a8eb3ca3 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -132,7 +132,7 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, 0 -> {noreply, State#{timer := undefined}}; {error, Reason} -> - ?LOG(warning, "Failed to get cpu utilization: ~p", [Reason]), + ?LOG(error, "[OS Monitor] Failed to get cpu utilization: ~p", [Reason]), {noreply, ensure_check_timer(State)}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), @@ -142,7 +142,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, true -> alarm_handler:clear_alarm(cpu_high_watermark); false -> ok end, - {noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})} + {noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})}; + _Busy -> + {noreply, ensure_check_timer(State)} end. terminate(_Reason, #{timer := Timer}) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index 83d6863e8..ee12ed96a 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -15,6 +15,7 @@ -module(emqx_plugins). -include("emqx.hrl"). +-include("logger.hrl"). -export([init/0]). @@ -85,7 +86,7 @@ load_expand_plugin(PluginDir) -> end, Modules), case filelib:wildcard(Ebin ++ "/*.app") of [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); - _ -> emqx_logger:error("App file cannot be found."), + _ -> ?LOG(alert, "[Plugins] Plugin not found."), {error, load_app_fail} end. @@ -110,7 +111,7 @@ with_loaded_file(File, SuccFun) -> {ok, Names} -> SuccFun(Names); {error, Error} -> - emqx_logger:error("[Plugins] Failed to read: ~p, error: ~p", [File, Error]), + ?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. @@ -118,7 +119,7 @@ load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of [] -> ok; - NotFound -> emqx_logger:error("[Plugins] Cannot find plugins: ~p", [NotFound]) + NotFound -> ?LOG(alert, "[Plugins] Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. @@ -163,12 +164,12 @@ plugin(AppName) -> load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of true -> - emqx_logger:error("[Plugins] Plugin ~s is already started", [PluginName]), + ?LOG(notice, "[Plugins] Plugin ~s is already started", [PluginName]), {error, already_started}; false -> case find_plugin(PluginName) of false -> - emqx_logger:error("[Plugins] Plugin ~s not found", [PluginName]), + ?LOG(alert, "[Plugins] Plugin ~s not found", [PluginName]), {error, not_found}; Plugin -> load_plugin(Plugin, true) @@ -196,12 +197,12 @@ load_app(App) -> start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> - emqx_logger:info("Started Apps: ~p", [Started]), - emqx_logger:info("Load plugin ~s successfully", [App]), + ?LOG(info, "[Plugins] Started plugins: ~p", [Started]), + ?LOG(info, "[Plugins] Load plugin ~s successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> - emqx_logger:error("Load plugin ~s error, cannot start app ~s for ~p", [App, ErrApp, Reason]), + ?LOG(error, "[Plugins] Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. @@ -218,10 +219,10 @@ unload(PluginName) when is_atom(PluginName) -> {true, true} -> unload_plugin(PluginName, true); {false, _} -> - emqx_logger:error("Plugin ~s is not started", [PluginName]), + ?LOG(error, "[Plugins] Plugin ~s is not started", [PluginName]), {error, not_started}; {true, false} -> - emqx_logger:error("~s is not a plugin, cannot unload it", [PluginName]), + ?LOG(error, "[Plugins] ~s is not a plugin, cannot unload it", [PluginName]), {error, not_found} end. @@ -236,11 +237,11 @@ unload_plugin(App, Persistent) -> stop_app(App) -> case application:stop(App) of ok -> - emqx_logger:info("Stop plugin ~s successfully", [App]), ok; + ?LOG(info, "[Plugins] Stop plugin ~s successfully", [App]), ok; {error, {not_started, App}} -> - emqx_logger:error("Plugin ~s is not started", [App]), ok; + ?LOG(error, "[Plugins] Plugin ~s is not started", [App]), ok; {error, Reason} -> - emqx_logger:error("Stop plugin ~s error: ~p", [App]), {error, Reason} + ?LOG(error, "[Plugins] Stop plugin ~s error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- @@ -268,7 +269,7 @@ plugin_loaded(Name, true) -> ignore end; {error, Error} -> - emqx_logger:error("Cannot read loaded plugins: ~p", [Error]) + ?LOG(error, "[Plugins] Cannot read loaded plugins: ~p", [Error]) end. plugin_unloaded(_Name, false) -> @@ -280,10 +281,10 @@ plugin_unloaded(Name, true) -> true -> write_loaded(lists:delete(Name, Names)); false -> - emqx_logger:error("Cannot find ~s in loaded_file", [Name]) + ?LOG(error, "[Plugins] Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> - emqx_logger:error("Cannot read loaded_plugins: ~p", [Error]) + ?LOG(error, "[Plugins] Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> @@ -302,7 +303,7 @@ write_loaded(AppNames) -> file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) end, AppNames); {error, Error} -> - emqx_logger:error("Open File ~p Error: ~p", [File, Error]), + ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), {error, Error} end. diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index db5f8db35..b78479f5c 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -97,22 +97,22 @@ handle_call({submit, Task}, _From, State) -> {reply, catch run(Task), State}; handle_call(Req, _From, State) -> - ?ERROR("[Pool] unexpected call: ~p", [Req]), + ?LOG(error, "[Pool] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({async_submit, Task}, State) -> try run(Task) catch _:Error:Stacktrace -> - ?ERROR("[Pool] error: ~p, ~p", [Error, Stacktrace]) + ?LOG(error, "[Pool] Error: ~p, ~p", [Error, Stacktrace]) end, {noreply, State}; handle_cast(Msg, State) -> - ?ERROR("[Pool] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Pool] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[Pool] unexpected info: ~p", [Info]), + ?LOG(error, "[Pool] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index cd5756671..dba1070eb 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -57,7 +57,6 @@ will_topic, will_msg, keepalive, - mountpoint, is_bridge, enable_ban, enable_acl, @@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF clean_start = false, topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_acl = emqx_zone:get_env(Zone, enable_acl), @@ -153,7 +151,6 @@ attrs(#pstate{zone = Zone, proto_ver = ProtoVer, proto_name = ProtoName, keepalive = Keepalive, - mountpoint = Mountpoint, is_bridge = IsBridge, connected_at = ConnectedAt, conn_mod = ConnMod, @@ -167,7 +164,6 @@ attrs(#pstate{zone = Zone, {proto_name, ProtoName}, {clean_start, CleanStart}, {keepalive, Keepalive}, - {mountpoint, Mountpoint}, {is_bridge, IsBridge}, {connected_at, ConnectedAt}, {conn_mod, ConnMod}, @@ -202,16 +198,27 @@ caps(#pstate{zone = Zone}) -> client_id(#pstate{client_id = ClientId}) -> ClientId. -credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 -> - Credentials; -credentials(#pstate{zone = Zone, - client_id = ClientId, - username = Username, - peername = Peername}) -> - #{zone => Zone, - client_id => ClientId, - username => Username, - peername => Peername}. +credentials(#pstate{zone = Zone, + client_id = ClientId, + username = Username, + peername = Peername, + peercert = Peercert}) -> + with_cert(#{zone => Zone, + client_id => ClientId, + username => Username, + peername => Peername, + mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). + +with_cert(Credentials, undefined) -> Credentials; +with_cert(Credentials, Peercert) -> + Credentials#{dn => esockd_peercert:subject(Peercert), + cn => esockd_peercert:common_name(Peercert)}. + +keepsafety(Credentials) -> + maps:filter(fun(password, _) -> false; + (dn, _) -> false; + (cn, _) -> false; + (_, _) -> true end, Credentials). stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, send_stats = #{pkt := SendPkt, msg := SendMsg}}) -> @@ -389,7 +396,7 @@ process(?CONNECT_PACKET( case try_open_session(SessAttrs, PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true, - credentials = maps:remove(password, Credentials0)}, + credentials = keepsafety(Credentials0)}, ok = emqx_cm:register_connection(client_id(PState4)), true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive @@ -397,11 +404,11 @@ process(?CONNECT_PACKET( %% Success {?RC_SUCCESS, SP, PState4}; {error, Error} -> - ?LOG(error, "Failed to open session: ~p", [Error]), + ?LOG(error, "[Protocol] Failed to open session: ~p", [Error]), {?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}} end; {error, Reason} -> - ?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), + ?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]), {emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}} end; {error, ReasonCode} -> @@ -413,7 +420,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) -> ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", + ?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), do_acl_deny_action(Packet, ReasonCode, PState) end; @@ -423,7 +430,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", + ?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({puback, PacketId, ReasonCode}, PState) of {ok, PState1} -> @@ -437,7 +444,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> ok -> do_publish(Packet, PState); {error, ReasonCode} -> - ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", + ?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), case deliver({pubrec, PacketId, ReasonCode}, PState) of {ok, PState1} -> @@ -469,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint, - proto_ver = ProtoVer, is_bridge = IsBridge, - ignore_loop = IgnoreLoop}) -> - RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> - IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, - case IsBridge of - true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; - false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] - end; - true -> - RawTopicFilters - end, - case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of + PState = #pstate{session = SPid, credentials = Credentials}) -> + case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of {ok, TopicFilters} -> - TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters), - ok = emqx_session:subscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(Mountpoint, TopicFilters0)), + TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), + TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), + ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1), {ok, PState}; {error, TopicFilters} -> {SubTopics, ReasonCodes} = @@ -495,7 +490,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), ({Topic, #{rc := Code}}, {Topics, Codes}) -> {[Topic|Topics], [Code|Codes]} end, {[], []}, TopicFilters), - ?LOG(warning, "Cannot subscribe ~p for ~p", + ?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), case deliver({suback, PacketId, ReasonCodes}, PState) of {ok, PState1} -> @@ -506,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), end; process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)], + PState = #pstate{session = SPid, credentials = Credentials}) -> + TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), ok = emqx_session:unsubscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(MountPoint, TopicFilters)), + emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)), {ok, PState}; process(?PACKET(?PINGREQ), PState) -> @@ -538,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) -> %% ConnAck --> Client %%------------------------------------------------------------------------------ -connack({?RC_SUCCESS, SP, PState}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), - deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); +connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]), + deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); -connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), +connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]), [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer), _ = deliver({connack, ReasonCode1}, PState), {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. @@ -553,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - Msg = emqx_mountpoint:mount(MountPoint, - emqx_packet:to_message(credentials(PState), Packet)), + PState = #pstate{session = SPid, credentials = Credentials}) -> + Msg = emqx_mountpoint:mount(mountpoint(Credentials), + emqx_packet:to_message(Credentials, Packet)), puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). %%------------------------------------------------------------------------------ @@ -651,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), +deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) -> + Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg), Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -818,11 +813,11 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl -> ok; -check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of +check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, WillTopic) of allow -> ok; deny -> - ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]), + ?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]), {error, ?RC_NOT_AUTHORIZED} end. @@ -838,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} end. @@ -865,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> {ok, TopicFilters}; -check_sub_acl(TopicFilters, PState) -> +check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) -> lists:foldr( fun({Topic, SubOpts}, {Ok, Acc}) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} @@ -876,9 +871,9 @@ check_sub_acl(TopicFilters, PState) -> end, {ok, []}, TopicFilters). trace(recv, Packet) -> - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]); + ?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]); trace(send, Packet) -> - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]). + ?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]). inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) -> PState#pstate{recv_stats = inc_stats(Type, Stats)}; @@ -900,9 +895,10 @@ terminate(conflict, _PState) -> ok; terminate(discard, _PState) -> ok; -terminate(Reason, PState) -> - ?LOG(info, "Shutdown for ~p", [Reason]), - ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). + +terminate(Reason, #pstate{credentials = Credentials}) -> + ?LOG(info, "[Protocol] Shutdown for ~p", [Reason]), + ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> ignore; @@ -920,14 +916,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%------------------------------------------------------------------------------ -%% Update mountpoint - -update_mountpoint(PState = #pstate{mountpoint = undefined}) -> - PState; -update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> - PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}. - sp(true) -> 1; sp(false) -> 0. @@ -974,3 +962,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. + +raw_topic_filters(#pstate{proto_ver = ProtoVer, + is_bridge = IsBridge, + ignore_loop = IgnoreLoop}, RawTopicFilters) -> + case ProtoVer < ?MQTT_PROTO_V5 of + true -> + IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] + end; + false -> + RawTopicFilters + end. + +mountpoint(Credentials) -> + maps:get(mountpoint, Credentials, undefined). diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl index 7510366eb..3b2407b1c 100644 --- a/src/emqx_psk.erl +++ b/src/emqx_psk.erl @@ -29,10 +29,10 @@ lookup(psk, ClientPSKID, _UserState) -> try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret}; Error -> - ?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), + ?LOG(error, "[PSK] Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]), error catch Except:Error:Stacktrace -> - ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), + ?LOG(error, "[PSK] Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), error end. \ No newline at end of file diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 868094609..8eb65169c 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -198,15 +198,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) -> {reply, Ok, State}; handle_call(Req, _From, State) -> - ?ERROR("[Router] unexpected call: ~p", [Req]), + ?LOG(error, "[Router] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[Router] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Router] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[Router] unexpected info: ~p", [Info]), + ?LOG(error, "[Router] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 0e2abb731..0b31929fb 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -103,11 +103,11 @@ init([]) -> {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> - ?ERROR("[RouterHelper] unexpected call: ~p", [Req]), + ?LOG(error, "[Router Helper] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[RouterHelper] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Router Helper] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> @@ -123,7 +123,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> {noreply, State}; handle_info({mnesia_table_event, Event}, State) -> - ?ERROR("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]), + ?LOG(error, "[Router Helper] Unexpected mnesia_table_event: ~p", [Event]), {noreply, State}; handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> @@ -141,7 +141,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?ERROR("[RouteHelper] unexpected info: ~p", [Info]), + ?LOG(error, "[Route Helper] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 49c63b539..3ef9107a7 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -422,11 +422,11 @@ handle_call(stats, _From, State) -> reply(stats(State), State); handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> - ?LOG(warning, "Discarded by ~p", [ByPid]), + ?LOG(warning, "[Session] Discarded by ~p", [ByPid]), {stop, {shutdown, discarded}, ok, State}; handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> - ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]), + ?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]), ConnPid ! {shutdown, discard, {ClientId, ByPid}}, {stop, {shutdown, discarded}, ok, State}; @@ -445,7 +445,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, {ok, ensure_stats_timer(ensure_await_rel_timer(State1))} end; true -> - ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), + ?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), emqx_metrics:trans(inc, 'messages/qos2/dropped'), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -457,7 +457,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In true -> {ok, ensure_stats_timer(acked(pubrec, PacketId, State))}; false -> - ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]), + ?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]), emqx_metrics:trans(inc, 'packets/pubrec/missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -469,7 +469,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {_Ts, AwaitingRel1} -> {ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})}; error -> - ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), + ?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]), emqx_metrics:trans(inc, 'packets/pubrel/missed'), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -478,7 +478,7 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(Req, _From, State) -> - emqx_logger:error("[Session] unexpected call: ~p", [Req]), + ?LOG(error, "[Session] Unexpected call: ~p", [Req]), {reply, ignored, State}. %% SUBSCRIBE: @@ -519,7 +519,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} true -> ensure_stats_timer(dequeue(acked(puback, PacketId, State))); false -> - ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), + ?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]), emqx_metrics:trans(inc, 'packets/puback/missed'), State end); @@ -531,7 +531,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight true -> ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State))); false -> - ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), + ?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]), emqx_metrics:trans(inc, 'packets/pubcomp/missed'), State end); @@ -549,14 +549,14 @@ handle_cast({resume, #{conn_pid := ConnPid, expiry_timer = ExpireTimer, will_delay_timer = WillDelayTimer}) -> - ?LOG(info, "Resumed by connection ~p ", [ConnPid]), + ?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]), %% Cancel Timers lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of - ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]); + ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]); ignore -> ok end, @@ -587,7 +587,7 @@ handle_cast({update_expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; handle_cast(Msg, State) -> - emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Session] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> @@ -623,12 +623,12 @@ handle_info({timeout, Timer, emit_stats}, GcState1 = emqx_gc:reset(GcState), {noreply, NewState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> - ?LOG(warning, "shutdown due to ~p", [Reason]), + ?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]), shutdown(Reason, NewState) end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> - ?LOG(info, "expired, shutdown now.", []), + ?LOG(info, "[Session] Expired, shutdown now.", []), shutdown(expired, State); handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> @@ -663,12 +663,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> {noreply, State#state{old_conn_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> - ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + ?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", [ConnPid, Pid, Reason]), {noreply, State}; handle_info(Info, State) -> - emqx_logger:error("[Session] unexpected info: ~p", [Info]), + ?LOG(error, "[Session] Unexpected info: ~p", [Info]), {noreply, State}. terminate(Reason, #state{will_msg = WillMsg, @@ -792,7 +792,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> emqx_metrics:trans(inc, 'messages/qos2/expired'), - ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), + ?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age -> ensure_await_rel_timer(Timeout - max(0, Age), State) @@ -839,8 +839,14 @@ drain_m(Cnt, Msgs) when Cnt =< 0 -> lists:reverse(Msgs); drain_m(Cnt, Msgs) -> receive - {dispatch, Topic, Msg} -> - drain_m(Cnt-1, [{Topic, Msg}|Msgs]) + {dispatch, Topic, Msg} when is_record(Msg, message)-> + drain_m(Cnt-1, [{Topic, Msg} | Msgs]); + {dispatch, Topic, InMsgs} when is_list(InMsgs) -> + Msgs1 = lists:foldl( + fun(Msg, Acc) -> + [{Topic, Msg} | Acc] + end, Msgs, InMsgs), + drain_m(Cnt-length(InMsgs), Msgs1) after 0 -> lists:reverse(Msgs) end. @@ -990,7 +996,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]), + ?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]), State end; @@ -1000,10 +1006,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> - ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]), State; none -> - ?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]), + ?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]), State end; diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 4672687a3..ad721d5fd 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -92,7 +92,7 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From, reply({error, Reason}, State) catch _:Error:Stk -> - ?ERROR("Failed to start session ~p: ~p, stacktrace:~n~p", + ?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p", [ClientId, Error, Stk]), reply({error, Error}, State) end; @@ -101,11 +101,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) -> {reply, maps:size(SessMap), State}; handle_call(Req, _From, State) -> - ?ERROR("unexpected call: ~p", [Req]), + ?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("unexpected cast: ~p", [Msg]), + ?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) -> @@ -117,7 +117,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow {noreply, State#state{sessions = SessMap1}}; handle_info(Info, State) -> - ?ERROR("unexpected info: ~p", [Info]), + ?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, State) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index d25910e31..5f1962542 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -310,11 +310,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?ERROR("[SharedSub] unexpected call: ~p", [Req]), + ?LOG(error, "[Shared Sub] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[SharedSub] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Shared Sub] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> @@ -329,12 +329,12 @@ handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - ?INFO("[SharedSub] shared subscriber down: ~p", [SubPid]), + ?LOG(info, "[Shared Sub] Shared subscriber down: ~p", [SubPid]), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(Info, State) -> - ?ERROR("[SharedSub] unexpected info: ~p", [Info]), + ?LOG(error, "[Shared Sub] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 900e7ca1a..4319b6ccb 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -114,7 +114,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> try emqx_session:discard(SessPid, ConnPid) catch _:Error:_Stk -> - ?ERROR("[SM] Failed to discard ~p: ~p", [SessPid, Error]) + ?LOG(warning, "[SM] Failed to discard ~p: ~p", [SessPid, Error]) end end, lookup_session_pids(ClientId)). @@ -128,7 +128,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> {ok, SessPid}; SessPids -> [SessPid|StalePids] = lists:reverse(SessPids), - ?ERROR("[SM] More than one session found: ~p", [SessPids]), + ?LOG(error, "[SM] More than one session found: ~p", [SessPids]), lists:foreach(fun(StalePid) -> catch emqx_session:discard(StalePid, ConnPid) end, StalePids), @@ -250,15 +250,15 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?ERROR("[SM] unexpected call: ~p", [Req]), + ?LOG(error, "[SM] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[SM] unexpected cast: ~p", [Msg]), + ?LOG(error, "[SM] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - ?ERROR("[SM] unexpected info: ~p", [Info]), + ?LOG(error, "[SM] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index e49b76e2c..ccacc9907 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -96,11 +96,11 @@ init([]) -> {ok, #{}}. handle_call(Req, _From, State) -> - ?ERROR("[Registry] unexpected call: ~p", [Req]), + ?LOG(error, "[Registry] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[Registry] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Registry] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> @@ -114,7 +114,7 @@ handle_info({membership, _Event}, State) -> {noreply, State}; handle_info(Info, State) -> - ?ERROR("[Registry] unexpected info: ~p", [Info]), + ?LOG(error, "[Registry] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index cef81b8c7..926db5b04 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -181,7 +181,7 @@ start_timer(#state{tick_ms = Ms} = State) -> handle_call(stop, _From, State) -> {stop, normal, _Reply = ok, State}; handle_call(Req, _From, State) -> - ?ERROR("[Stats] unexpected call: ~p", [Req]), + ?LOG(error, "[Stats] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({setstat, Stat, MaxStat, Val}, State) -> @@ -199,7 +199,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) -> handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) -> case lists:keyfind(Name, #update.name, Updates) of #update{} -> - ?ERROR("[Stats]: duplicated update: ~s", [Name]), + ?LOG(warning, "[Stats] Duplicated update: ~s", [Name]), {noreply, State}; false -> {noreply, State#state{updates = [Update | Updates]}} @@ -209,7 +209,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) -> {noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}}; handle_cast(Msg, State) -> - ?ERROR("[Stats] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Stats] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) -> @@ -219,7 +219,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update try UpFun() catch _:Error -> - ?ERROR("[Stats] update ~s error: ~p", [Name, Error]) + ?LOG(error, "[Stats] update ~s failed: ~p", [Name, Error]) end, [Update#update{countdown = I} | Acc]; (Update = #update{countdown = C}, Acc) -> @@ -228,7 +228,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> - ?ERROR("[Stats] unexpected info: ~p", [Info]), + ?LOG("error, [Stats] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index a0448fb18..e5b718b05 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -117,11 +117,11 @@ handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> - ?ERROR("[SYS] unexpected call: ~p", [Req]), + ?LOG(error, "[SYS] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[SYS] unexpected cast: ~p", [Msg]), + ?LOG(error, "[SYS] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) -> @@ -138,7 +138,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi {noreply, tick(State), hibernate}; handle_info(Info, State) -> - ?ERROR("[SYS] unexpected info: ~p", [Info]), + ?LOG(error, "[SYS] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index d6c894cc7..03cc95819 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -88,18 +88,18 @@ parse_opt([_Opt|Opts], Acc) -> parse_opt(Opts, Acc). handle_call(Req, _From, State) -> - ?ERROR("[SYSMON] unexpected call: ~p", [Req]), + ?LOG(error, "[SYSMON] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?ERROR("[SYSMON] unexpected cast: ~p", [Msg]), + ?LOG(error, "[SYSMON] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), - ?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_gc, WarnMsg) end, State); @@ -107,7 +107,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - ?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -115,7 +115,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?WARN("[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -123,7 +123,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - ?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]), safe_publish(large_heap, WarnMsg) end, State); @@ -131,7 +131,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_port, WarnMsg) end, State); @@ -139,7 +139,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -147,7 +147,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#{events := []}, hibernate}; handle_info(Info, State) -> - ?ERROR("[SYSMON] unexpected Info: ~p", [Info]), + ?LOG(error, "[SYSMON] Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{timer := TRef}) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 051c9f6f2..464a28ad2 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -17,6 +17,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("logger.hrl"). %% APIs -export([start_link/0]). @@ -110,10 +111,10 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T filters => [{meta_key_filter, {fun filter_by_meta_key/2, Who} }]}) of ok -> - emqx_logger:info("[Tracer] start trace for ~p", [Who]), + ?LOG(info, "[Tracer] Start trace for ~p", [Who]), {reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}}; {error, Reason} -> - emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]), + ?LOG(error, "[Tracer] Start trace for ~p failed, error: ~p", [Who, Reason]), {reply, {error, Reason}, State} end; @@ -122,9 +123,9 @@ handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> {ok, _LogFile} -> case logger:remove_handler(handler_id(Who)) of ok -> - emqx_logger:info("[Tracer] stop trace for ~p", [Who]); + ?LOG(info, "[Tracer] Stop trace for ~p", [Who]); {error, Reason} -> - emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason]) + ?LOG(error, "[Tracer] Stop trace for ~p failed, error: ~p", [Who, Reason]) end, {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> @@ -135,15 +136,15 @@ handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> - emqx_logger:error("[Tracer] unexpected call: ~p", [Req]), + ?LOG(error, "[Tracer] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - emqx_logger:error("[Tracer] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Tracer] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(Info, State) -> - emqx_logger:error("[Tracer] unexpected info: ~p", [Info]), + ?LOG(error, "[Tracer] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl index f91fc519c..10be9d71b 100644 --- a/src/emqx_vm_mon.erl +++ b/src/emqx_vm_mon.erl @@ -109,7 +109,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, true -> alarm_handler:clear_alarm(too_many_processes); false -> ok end, - {noreply, ensure_check_timer(State#{is_process_alarm_set := false})} + {noreply, ensure_check_timer(State#{is_process_alarm_set := false})}; + _Precent -> + {noreply, ensure_check_timer(State)} end. terminate(_Reason, #{timer := Timer}) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 6d47e16ad..e3094e11c 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) -> %%------------------------------------------------------------------------------ init(Req, Opts) -> + IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000), + DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])), + MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of + 0 -> infinity; + MFS -> MFS + end, + Compress = proplists:get_value(compress, Opts, false), + Options = #{compress => Compress, + deflate_opts => DeflateOptions, + max_frame_size => MaxFrameSize, + idle_timeout => IdleTimeout}, case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> - {cowboy_websocket, Req, #state{}}; + {cowboy_websocket, Req, #state{}, Options}; [<<"mqtt", Vsn/binary>>] -> Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req), - {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}}; + {cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options}; _ -> {ok, cowboy_req:reply(400, Req), #state{}} end. @@ -163,7 +174,7 @@ websocket_handle({binary, [<<>>]}, State) -> {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parse_state = ParseState, proto_state = ProtoState}) -> - ?LOG(debug, "RECV ~p", [Data]), + ?LOG(debug, "[WS Connection] RECV ~p", [Data]), BinSize = iolist_size(Data), emqx_pd:update_counter(recv_oct, BinSize), emqx_metrics:trans(inc, 'bytes/received', BinSize), @@ -177,7 +188,7 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState, {ok, ProtoState1} -> websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); {error, Error} -> - ?LOG(error, "Protocol error - ~p", [Error]), + ?LOG(error, "[WS Connection] Protocol error: ~p", [Error]), shutdown(Error, State); {error, Reason, ProtoState1} -> shutdown(Reason, State#state{proto_state = ProtoState1}); @@ -185,11 +196,11 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState, shutdown(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> - ?LOG(error, "Frame error: ~p", [Error]), + ?LOG(error, "[WS Connection] Frame error: ~p", [Error]), shutdown(Error, State) catch _:Error -> - ?LOG(error, "Frame error:~p~nFrame data: ~p", [Error, Data]), + ?LOG(error, "[WS Connection] Frame error:~p~nFrame data: ~p", [Error, Data]), shutdown(parse_error, State) end; %% Pings should be replied with pongs, cowboy does it automatically @@ -236,12 +247,12 @@ websocket_info({timeout, Timer, emit_stats}, {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> - ?LOG(debug, "Keepalive at the interval of ~p", [Interval]), + ?LOG(debug, "[WS Connection] Keepalive at the interval of ~p", [Interval]), case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> - ?LOG(warning, "Keepalive error - ~p", [Error]), + ?LOG(warning, "[WS Connection] Keepalive error: ~p", [Error]), shutdown(Error, State) end; @@ -250,19 +261,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> - ?LOG(debug, "Keepalive Timeout!"), + ?LOG(debug, "[WS Connection] Keepalive Timeout!"), shutdown(keepalive_timeout, State); {error, Error} -> - ?LOG(error, "Keepalive error - ~p", [Error]), + ?LOG(error, "[WS Connection] Keepalive error: ~p", [Error]), shutdown(keepalive_error, State) end; websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> - ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]), + ?LOG(warning, "[WS Connection] Discarded by ~s:~p", [ClientId, ByPid]), shutdown(discard, State); websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), + ?LOG(warning, "[WS Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); websocket_info({binary, Data}, State) -> @@ -272,14 +283,14 @@ websocket_info({shutdown, Reason}, State) -> shutdown(Reason, State); websocket_info(Info, State) -> - ?LOG(error, "unexpected info: ~p", [Info]), + ?LOG(error, "[WS Connection] Unexpected info: ~p", [Info]), {ok, State}. terminate(SockError, _Req, #state{keepalive = Keepalive, proto_state = ProtoState, shutdown = Shutdown}) -> - ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), + ?LOG(debug, "[WS Connection] Terminated for ~p, sockerror: ~p", [Shutdown, SockError]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of {undefined, _} -> ok; @@ -308,4 +319,3 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. - diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 8a821713b..25bace1f2 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -92,7 +92,7 @@ handle_call(force_reload, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?ERROR("[Zone] unexpected call: ~p", [Req]), + ?LOG(error, "[Zone] Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast({set_env, Zone, Key, Val}, State) -> @@ -100,7 +100,7 @@ handle_cast({set_env, Zone, Key, Val}, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?ERROR("[Zone] unexpected cast: ~p", [Msg]), + ?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]), {noreply, State}. handle_info(reload, State) -> @@ -108,7 +108,7 @@ handle_info(reload, State) -> {noreply, ensure_reload_timer(State#{timer := undefined}), hibernate}; handle_info(Info, State) -> - ?ERROR("[Zone] unexpected info: ~p", [Info]), + ?LOG(error, "[Zone] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) ->