Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-03-28 13:38:36 +08:00
commit 5a4adefb16
42 changed files with 617 additions and 296 deletions

View File

@ -10,7 +10,7 @@ dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
NO_AUTOPATCH = cuttlefish

View File

@ -114,6 +114,22 @@ cluster.autoclean = 5m
## Default: 1m, 1 minute
## cluster.etcd.node_ttl = 1m
## Path to a file containing the client's private PEM-encoded key.
##
## Value: File
## cluster.etcd.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
## The path to a file containing the client's certificate.
##
## Value: File
## cluster.etcd.ssl.certfile = {{ platform_etc_dir }}/certs/client.pem
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## are used during server authentication and when building the client certificate chain.
##
## Value: File
## cluster.etcd.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem
##--------------------------------------------------------------------
## Cluster using Kubernates
@ -142,12 +158,6 @@ cluster.autoclean = 5m
## Value: String
## cluster.k8s.namespace = default
## Kubernates Namespace
##
## Value: String
## cluster.k8s.namespace = default
##--------------------------------------------------------------------
## Node
##--------------------------------------------------------------------
@ -1261,7 +1271,7 @@ listener.ws.external.zone = external
## The access control for the MQTT/WebSocket listener.
##
## See: listener.tcp.$name.access
## See: listener.ws.$name.access
##
## Value: ACL Rule
listener.ws.external.access.1 = allow all
@ -1286,74 +1296,143 @@ listener.ws.external.verify_protocol_header = on
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
##
## See: listener.tcp.$name.proxy_protocol
## See: listener.ws.$name.proxy_protocol
##
## Value: on | off
## listener.ws.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.tcp.$name.proxy_protocol_timeout
## See: listener.ws.$name.proxy_protocol_timeout
##
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.tcp.$name.backlog
## See: listener.ws.$name.backlog
##
## Value: Number >= 0
listener.ws.external.backlog = 1024
## The TCP send timeout for external MQTT/WebSocket connections.
##
## See: listener.tcp.$name.send_timeout
## See: listener.ws.$name.send_timeout
##
## Value: Duration
listener.ws.external.send_timeout = 15s
## Close the MQTT/WebSocket connection if send timeout.
##
## See: listener.tcp.$name.send_timeout_close
## See: listener.ws.$name.send_timeout_close
##
## Value: on | off
listener.ws.external.send_timeout_close = on
## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections.
##
## See: listener.tcp.$name.recbuf
## See: listener.ws.$name.recbuf
##
## Value: Bytes
## listener.ws.external.recbuf = 2KB
## The TCP send buffer(os kernel) for external MQTT/WebSocket connections.
##
## See: listener.tcp.$name.sndbuf
## See: listener.ws.$name.sndbuf
##
## Value: Bytes
## listener.ws.external.sndbuf = 2KB
## The size of the user-level software buffer used by the driver.
##
## See: listener.tcp.$name.buffer
## See: listener.ws.$name.buffer
##
## Value: Bytes
## listener.ws.external.buffer = 2KB
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
##
## See: listener.tcp.$name.tune_buffer
## See: listener.ws.$name.tune_buffer
##
## Value: on | off
## listener.ws.external.tune_buffer = off
## The TCP_NODELAY flag for external MQTT/WebSocket connections.
##
## See: listener.tcp.$name.nodelay
## See: listener.ws.$name.nodelay
##
## Value: true | false
listener.ws.external.nodelay = true
## The compress flag for external MQTT/WebSocket connections.
##
## If this Value is set true,the websocket message would be compressed
##
## Value: true | false
## listener.ws.external.compress = true
## The level of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.level
##
## Value: none | default | best_compression | best_speed
## listener.ws.external.deflate_opts.level = default
## The mem_level of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.mem_level
##
## Valid range is 1-9
## listener.ws.external.deflate_opts.mem_level = 8
## The strategy of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.strategy
##
## Value: default | filtered | huffman_only | rle
## listener.ws.external.deflate_opts.strategy = default
## The deflate option for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.server_context_takeover
##
## Value: takeover | no_takeover
## listener.ws.external.deflate_opts.server_context_takeover = takeover
## The deflate option for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.client_context_takeover
##
## Value: takeover | no_takeover
## listener.ws.external.deflate_opts.client_context_takeover = takeover
## The deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.server_max_window_bits
##
## Valid range is 8-15
## listener.ws.external.deflate_opts.server_max_window_bits = 15
## The deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.client_max_window_bits
##
## Valid range is 8-15
## listener.ws.external.deflate_opts.client_max_window_bits = 15
## The idle timeout for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.idle_timeout
##
## Value: Duration
## listener.ws.external.idle_timeout = 60s
## The max frame size for external MQTT/WebSocket connections.
##
##
## Value: Number
## listener.ws.external.max_frame_size = 0
##--------------------------------------------------------------------
## External WebSocket/SSL listener for MQTT Protocol
@ -1486,7 +1565,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## See: listener.ssl.$name.secure_renegotiate
##
@ -1557,6 +1636,74 @@ listener.wss.external.send_timeout_close = on
## Value: true | false
## listener.wss.external.nodelay = true
## The compress flag for external WebSocket/SSL connections.
##
## If this Value is set true,the websocket message would be compressed
##
## Value: true | false
## listener.wss.external.compress = true
## The level of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.level
##
## Value: none | default | best_compression | best_speed
## listener.wss.external.deflate_opts.level = default
## The mem_level of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.mem_level
##
## Valid range is 1-9
## listener.wss.external.deflate_opts.mem_level = 8
## The strategy of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.strategy
##
## Value: default | filtered | huffman_only | rle
## listener.wss.external.deflate_opts.strategy = default
## The deflate option for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.server_context_takeover
##
## Value: takeover | no_takeover
## listener.wss.external.deflate_opts.server_context_takeover = takeover
## The deflate option for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.client_context_takeover
##
## Value: takeover | no_takeover
## listener.wss.external.deflate_opts.client_context_takeover = takeover
## The deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.server_max_window_bits
##
## Valid range is 8-15
## listener.wss.external.deflate_opts.server_max_window_bits = 15
## The deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.client_max_window_bits
##
## Valid range is 8-15
## listener.wss.external.deflate_opts.client_max_window_bits = 15
## The idle timeout for external WebSocket/SSL connections.
##
## See: listener.wss.$name.idle_timeout
##
## Value: Duration
## listener.wss.external.idle_timeout = 60s
## The max frame size for external WebSocket/SSL connections.
##
## Value: Number
## listener.wss.external.max_frame_size = 0
##--------------------------------------------------------------------
## Bridges
##--------------------------------------------------------------------

View File

@ -105,6 +105,18 @@
{default, "1m"}
]}.
{mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [
{datatype, string}
]}.
{mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [
{datatype, string}
]}.
{mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% Cluster on K8s
@ -149,9 +161,16 @@
[{name, cuttlefish:conf_get("cluster.dns.name", Conf)},
{app, cuttlefish:conf_get("cluster.dns.app", Conf)}];
(etcd) ->
SslOpts = fun(Conf) ->
Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf),
lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) ->
{list_to_atom(Name), Value}
end, Options)
end,
[{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")},
{prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")},
{node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}];
{node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)},
{ssl_options, SslOpts(Conf)}];
(k8s) ->
[{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)},
{service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)},
@ -1238,6 +1257,57 @@ end}.
hidden
]}.
{mapping, "listener.ws.$name.compress", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [
{datatype, {enum, [none, default, best_compression, best_speed]}},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [
{datatype, integer},
{validators, ["range:1-9"]},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [
{datatype, {enum, [default, filtered, huffman_only, rle]}},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
{datatype, {enum, [takeover, no_takeover]}},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
{datatype, {enum, [takeover, no_takeover]}},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [
{datatype, {duration, ms}},
hidden
]}.
{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [
{datatype, integer},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT/WebSocket/SSL Listeners
@ -1393,6 +1463,61 @@ end}.
{datatype, {enum, [cn, dn, crt]}}
]}.
{mapping, "listener.wss.$name.compress", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [
{datatype, {enum, [none, default, best_compression, best_speed]}},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [
{datatype, integer},
{validators, ["range:1-9"]},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [
{datatype, {enum, [default, filtered, huffman_only, rle]}},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
{datatype, {enum, [takeover, no_takeover]}},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
{datatype, {enum, [takeover, no_takeover]}},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
{datatype, integer},
{validators, ["range:8-15"]},
hidden
]}.
{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
{datatype, integer},
{validators, ["range:8-15"]},
hidden
]}.
{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [
{datatype, {duration, ms}},
hidden
]}.
{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [
{datatype, integer},
hidden
]}.
{translation, "emqx.listeners", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
@ -1431,19 +1556,30 @@ end}.
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
end,
DeflateOpts = fun(Prefix) ->
Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
{mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)},
{strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)},
{server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)},
{client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)},
{server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)},
{client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}])
end,
TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
end,
SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
MapPSKCiphers = fun(PSKCiphers) ->
lists:map(
@ -1496,7 +1632,8 @@ end}.
case cuttlefish:conf_get(Prefix, Conf, undefined) of
undefined -> [];
ListenOn ->
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
[{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
end
end,
@ -1506,7 +1643,8 @@ end}.
undefined ->
[];
ListenOn ->
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)},
[{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
{tcp_options, TcpOpts(Prefix)},
{ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
end
end,
@ -1987,4 +2125,4 @@ end}.
[{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
{process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
{process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
end}.
end}.

View File

@ -1,6 +1,6 @@
{deps, [{jsx, "2.9.0"},
{gproc, "0.8.0"},
{cowboy, "2.4.0"},
{cowboy, "2.6.1"},
{meck, "0.8.13"} %% temp workaround for version check
]}.
@ -28,4 +28,3 @@
{cover_export_enabled, true}.
{plugins, [coveralls]}.

View File

@ -15,6 +15,7 @@
-module(emqx).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
%% Start/Stop the application
@ -180,7 +181,7 @@ shutdown() ->
shutdown(normal).
shutdown(Reason) ->
emqx_logger:error("emqx shutdown for ~s", [Reason]),
?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]),
emqx_alarm_handler:unload(),
emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).

View File

@ -25,7 +25,7 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% gen_server callbacks
%% gen_event callbacks
-export([ init/1
, handle_event/2
, handle_call/2
@ -93,17 +93,17 @@ init(_) ->
handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(notice, "Alarm report: set ~p", [Alarm]),
?LOG(warning, "[Alarm Handler] ~p set", [Alarm]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "Alarm report: clear ~p", [AlarmId]),
?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]),
emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
clear_alarm_(AlarmId),
{ok, State};

View File

@ -88,11 +88,11 @@ init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
handle_call(Req, _From, State) ->
?ERROR("[Banned] unexpected call: ~p", [Req]),
?LOG(error, "[Banned] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[Banned] unexpected msg: ~p", [Msg]),
?LOG(error, "[Banned] unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
@ -100,7 +100,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
{noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) ->
?ERROR("[Banned] unexpected info: ~p", [Info]),
?LOG(error, "[Banned] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{expiry_timer := TRef}) ->

View File

@ -298,10 +298,8 @@ standing_by({call, From}, ensure_started, State) ->
[{reply, From, ok}]};
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, {error,standing_by}}]};
standing_by(info, Info, State) ->
?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State};
standing_by(Type, Content, State) ->
common(standing_by, Type, Content, State).
@ -320,7 +318,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
ok = subscribe_local_topics(Forwards),
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "Bridge ~p connected", [name()]),
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
Action = {state_timeout, 0, connected},
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
error ->
@ -370,7 +368,7 @@ connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref := undefined, connection := undefined}};
false ->
@ -382,7 +380,7 @@ connected(info, {batch_ack, Ref}, State) ->
keep_state_and_data;
bad_order ->
%% try re-connect then re-send
?LOG(error, "Bad order ack received by bridge ~p", [name()]),
?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]),
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState, ?maybe_send}
@ -413,7 +411,7 @@ common(_StateName, info, {dispatch, _, Msg},
NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) ->
?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p",
?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p",
[name(), Type, StateName, Content]),
{keep_state, State}.
@ -484,7 +482,7 @@ retry_inflight(#{inflight := Inflight} = State,
{ok, NewState} ->
retry_inflight(NewState, T);
{error, Reason} ->
?LOG(error, "Inflight retry failed\n~p", [Reason]),
?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]),
{error, State#{inflight := Inflight ++ Remain}}
end.
@ -515,7 +513,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
batch => Batch}],
{ok, State#{inflight := NewInflight}};
{error, Reason} ->
?LOG(info, "Batch produce failed\n~p", [Reason]),
?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]),
{error, State}
end.
@ -574,4 +572,3 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
id(Pid) when is_pid(Pid) -> Pid;
id(Name) -> name(Name).

View File

@ -54,7 +54,7 @@ start(Module, Config) ->
{ok, Ref, Conn};
{error, Reason} ->
Config1 = obfuscate(Config),
?LOG(error, "Failed to connect with module=~p\n"
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
error
end.

View File

@ -195,7 +195,7 @@ publish(Msg) when is_record(Msg, message) ->
Headers = Msg#message.headers,
case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
#message{headers = #{allow_publish := false}} ->
?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]),
?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
@ -209,7 +209,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
publish(Msg)
catch
_:Error:Stacktrace ->
?ERROR("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
after
ok
end.
@ -256,7 +256,7 @@ forward(Node, To, Delivery) ->
%% rpc:call to ensure the delivery, but the latency:(
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?ERROR("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
end.
@ -424,14 +424,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
?ERROR("[Broker] unexpected call: ~p", [Req]),
?LOG(error, "[Broker] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} ->
?ERROR("[Broker] Failed to add route: ~p", [Reason])
?LOG(error, "[Broker] Failed to add route: ~p", [Reason])
end,
{noreply, State};
@ -454,11 +454,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?ERROR("[Broker] unexpected cast: ~p", [Msg]),
?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[Broker] unexpected info: ~p", [Info]),
?LOG(error, "[Broker] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -110,7 +110,7 @@ init([]) ->
{ok, #{pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
?ERROR("[BrokerHelper] unexpected call: ~p", [Req]),
?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
@ -119,7 +119,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
handle_cast(Msg, State) ->
?ERROR("[BrokerHelper] unexpected cast: ~p", [Msg]),
?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
@ -130,7 +130,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
{noreply, State#{pmon := PMon1}};
handle_info(Info, State) ->
?ERROR("[BrokerHelper] unexpected info: ~p", [Info]),
?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -16,6 +16,7 @@
-behaviour(gen_statem).
-include("logger.hrl").
-include("types.hrl").
-include("emqx_client.hrl").
@ -786,10 +787,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
State#state{inflight = Inflight1};
{value, {pubrel, _Ref, _Ts}} ->
emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]),
?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]),
State;
none ->
emqx_logger:warning("Unexpected PUBREC Packet: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]),
State
end);
@ -804,7 +805,7 @@ connected(cast, ?PUBREL_PACKET(PacketId),
false -> {keep_state, NewState}
end;
error ->
emqx_logger:warning("Unexpected PUBREL: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]),
keep_state_and_data
end;
@ -903,33 +904,33 @@ handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
emqx_logger:debug("RECV Data: ~p", [Data]),
?LOG(debug, "[Client] RECV Data: ~p", [Data]),
process_incoming(Data, [], run_sock(State));
handle_event(info, {Error, _Sock, Reason}, _StateName, State)
when Error =:= tcp_error; Error =:= ssl_error ->
emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]),
?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {Closed, _Sock}, _StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
emqx_logger:debug("[~p] ~p", [?MODULE, Closed]),
?LOG(debug, "[Client] ~p", [Closed]),
{stop, {shutdown, Closed}, State};
handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
keep_state_and_data;
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
{stop, {shutdown, Reason}, State};
handle_event(EventType, EventContent, StateName, _StateData) ->
emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
keep_state_and_data.
%% Mandatory callback functions
@ -971,7 +972,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]),
State
end;
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
@ -983,7 +984,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]),
State
end.
@ -1186,7 +1187,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) ->
send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
emqx_logger:debug("SEND Data: ~1000p", [Packet]),
?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]),
case emqx_client_sock:send(Sock, Data) of
ok -> {ok, bump_last_packet_id(State)};
Error -> Error

View File

@ -159,7 +159,7 @@ init([]) ->
{ok, #{conn_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
?ERROR("[CM] unexpected call: ~p", [Req]),
?LOG(error, "[CM] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
@ -169,7 +169,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
handle_cast(Msg, State) ->
?ERROR("[CM] unexpected cast: ~p", [Msg]),
?LOG(error, "[CM] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
@ -180,7 +180,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}
{noreply, State#{conn_pmon := PMon1}};
handle_info(Info, State) ->
?ERROR("[CM] unexpected info: ~p", [Info]),
?LOG(error, "[CM] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -286,18 +286,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) ->
reply(From, emqx_protocol:session(ProtoState), State);
handle({call, From}, Req, State) ->
?LOG(error, "unexpected call: ~p", [Req]),
?LOG(error, "[Connection] Unexpected call: ~p", [Req]),
reply(From, ignored, State);
%% Handle cast
handle(cast, Msg, State) ->
?LOG(error, "unexpected cast: ~p", [Msg]),
?LOG(error, "[Connection] Unexpected cast: ~p", [Msg]),
{keep_state, State};
%% Handle Incoming
handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
Oct = iolist_size(Data),
?LOG(debug, "RECV ~p", [Data]),
?LOG(debug, "[Connection] RECV ~p", [Data]),
emqx_pd:update_counter(incoming_bytes, Oct),
emqx_metrics:trans(inc, 'bytes/received', Oct),
NState = ensure_stats_timer(maybe_gc({1, Oct}, State)),
@ -345,23 +345,23 @@ handle(info, {timeout, Timer, emit_stats},
GcState1 = emqx_gc:reset(GcState),
{keep_state, NState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason]),
?LOG(error, "[Connection] Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NState)
end;
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
?LOG(error, "[Connection] Discarded by ~s:~p", [ClientId, ByPid]),
shutdown(discard, State);
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
?LOG(warning, "[Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State);
handle(info, {shutdown, Reason}, State) ->
shutdown(Reason, State);
handle(info, Info, State) ->
?LOG(error, "unexpected info: ~p", [Info]),
?LOG(error, "[Connection] Unexpected info: ~p", [Info]),
{keep_state, State}.
code_change(_Vsn, State, Data, _Extra) ->
@ -371,7 +371,7 @@ terminate(Reason, _StateName, #state{transport = Transport,
socket = Socket,
keepalive = KeepAlive,
proto_state = ProtoState}) ->
?LOG(debug, "Terminated for ~p", [Reason]),
?LOG(debug, "[Connection] Terminated for ~p", [Reason]),
Transport:fast_close(Socket),
emqx_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of
@ -398,7 +398,7 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
shutdown(Reason, State)
catch
_:Error:Stk->
?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Error, Stk, Data]),
?LOG(error, "[Connection] Parse failed for ~p~nStacktrace:~p~nError data:~p", [Error, Stk, Data]),
shutdown(Error, State)
end.

View File

@ -107,14 +107,14 @@ init([]) ->
{ok, #state{seq = 0}}.
handle_call(Req, _From, State) ->
?ERROR("[Ctl] unexpected call: ~p", [Req]),
?LOG(error, "[Ctl] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
[] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] ->
?WARN("[Ctl] cmd ~s is overidden by ~p", [Cmd, MF]),
?LOG(warning, "[Ctl] CMD ~s is overidden by ~p", [Cmd, MF]),
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
noreply(next_seq(State));
@ -124,11 +124,11 @@ handle_cast({unregister_command, Cmd}, State) ->
noreply(State);
handle_cast(Msg, State) ->
?ERROR("[Ctl] unexpected cast: ~p", [Msg]),
?LOG(error, "[Ctl] Unexpected cast: ~p", [Msg]),
noreply(State).
handle_info(Info, State) ->
?ERROR("[Ctl] unexpected info: ~p", [Info]),
?LOG(error, "[Ctl] Unexpected info: ~p", [Info]),
noreply(State).
terminate(_Reason, _State) ->

View File

@ -181,7 +181,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat
{reply, Reply, State};
handle_call(Req, _From, State) ->
?ERROR("[Hooks] unexpected call: ~p", [Req]),
?LOG(error, "[Hooks] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({del, HookPoint, Action}, State) ->
@ -194,11 +194,11 @@ handle_cast({del, HookPoint, Action}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?ERROR("[Hooks] unexpected msg: ~p", [Msg]),
?LOG(error, "[Hooks] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[Hooks] unexpected info: ~p", [Info]),
?LOG(error, "[Hooks] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -64,25 +64,27 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
%% Start MQTT/WS listener
start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch);
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), ws_opts(Options));
%% Start MQTT/WSS listener
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch).
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), ws_opts(Options)).
start_mqtt_listener(Name, ListenOn, Options) ->
SockOpts = esockd:parse_opt(Options),
esockd:open(Name, ListenOn, merge_default(SockOpts),
{emqx_connection, start_link, [Options -- SockOpts]}).
start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) ->
Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) ->
Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts).
mqtt_path(Options) ->
proplists:get_value(mqtt_path, Options, "/mqtt").
ws_opts(Options) ->
Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
#{env => #{dispatch => Dispatch}, proxy_header => proplists:get_value(proxy_protocol, Options, false)}.
ranch_opts(Options) ->
NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_connections, Options, 1024),
@ -163,4 +165,3 @@ format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [esockd_net:ntoab(Addr), Port]).

View File

@ -174,7 +174,7 @@ to_list(Msg) ->
to_bin_key_list(Msg) ->
lists:zipwith(
fun(Key, Val) ->
{bin(Key), Val}
{bin(Key), bin_key_map(Val)}
end, record_info(fields, message), tl(tuple_to_list(Msg))).
%% MilliSeconds
@ -192,6 +192,13 @@ format(flags, Flags) ->
format(headers, Headers) ->
io_lib:format("~p", [Headers]).
bin_key_map(Map) when is_map(Map) ->
maps:fold(fun(Key, Val, Acc) ->
Acc#{bin(Key) => bin_key_map(Val)}
end, #{}, Map);
bin_key_map(Data) ->
Data.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom));
bin(Str) when is_list(Str) -> list_to_binary(Str).
bin(Str) when is_list(Str) -> list_to_binary(Str).

View File

@ -318,15 +318,15 @@ init([]) ->
{ok, #{}, hibernate}.
handle_call(Req, _From, State) ->
?ERROR("[Metrics] unexpected call: ~p", [Req]),
?LOG(error, "[Metrics] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[Metrics] unexpected cast: ~p", [Msg]),
?LOG(error, "[Metrics] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[Metrics] unexpected info: ~p", [Info]),
?LOG(error, "[Metrics] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{}) ->

View File

@ -99,7 +99,7 @@ rules_from_file(AclFile) ->
#{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
{error, Reason} ->
?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
?LOG(alert, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
#{}
end.

View File

@ -17,6 +17,7 @@
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
%% APIs
-export([ on_client_connected/4
@ -50,7 +51,7 @@ on_client_connected(#{client_id := ClientId,
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
?LOG(error, "[Presence] Encoding connected event error: ~p", [Reason])
end.
on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) ->
@ -61,7 +62,7 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E
{ok, Payload} ->
emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
{error, Reason} ->
emqx_logger:error("[Presence Module] Json error: ~p", [Reason])
?LOG(error, "[Presence] Encoding disconnected event error: ~p", [Reason])
end.
unload(_Env) ->

View File

@ -14,6 +14,8 @@
-module(emqx_modules).
-include("logger.hrl").
-export([ load/0
, unload/0
]).
@ -24,7 +26,7 @@ load() ->
lists:foreach(
fun({Mod, Env}) ->
ok = Mod:load(Env),
logger:info("Load ~s module successfully.", [Mod])
?LOG(info, "[Modules] Load ~s module successfully.", [Mod])
end, emqx_config:get_env(modules, [])).
-spec(unload() -> ok).

View File

@ -46,7 +46,7 @@ unmount(MountPoint, Msg = #message{topic = Topic}) ->
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
catch
_Error:Reason ->
?LOG(error, "Unmount error : ~p", [Reason]),
?LOG(error, "[Mountpoint] Unmount error : ~p", [Reason]),
Msg
end.

View File

@ -132,7 +132,7 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
0 ->
{noreply, State#{timer := undefined}};
{error, Reason} ->
?LOG(warning, "Failed to get cpu utilization: ~p", [Reason]),
?LOG(error, "[OS Monitor] Failed to get cpu utilization: ~p", [Reason]),
{noreply, ensure_check_timer(State)};
Busy when Busy / 100 >= CPUHighWatermark ->
alarm_handler:set_alarm({cpu_high_watermark, Busy}),
@ -142,7 +142,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
true -> alarm_handler:clear_alarm(cpu_high_watermark);
false -> ok
end,
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})}
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})};
_Busy ->
{noreply, ensure_check_timer(State)}
end.
terminate(_Reason, #{timer := Timer}) ->

View File

@ -15,6 +15,7 @@
-module(emqx_plugins).
-include("emqx.hrl").
-include("logger.hrl").
-export([init/0]).
@ -85,7 +86,7 @@ load_expand_plugin(PluginDir) ->
end, Modules),
case filelib:wildcard(Ebin ++ "/*.app") of
[App|_] -> application:load(list_to_atom(filename:basename(App, ".app")));
_ -> emqx_logger:error("App file cannot be found."),
_ -> ?LOG(alert, "[Plugins] Plugin not found."),
{error, load_app_fail}
end.
@ -110,7 +111,7 @@ with_loaded_file(File, SuccFun) ->
{ok, Names} ->
SuccFun(Names);
{error, Error} ->
emqx_logger:error("[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
{error, Error}
end.
@ -118,7 +119,7 @@ load_plugins(Names, Persistent) ->
Plugins = list(), NotFound = Names -- names(Plugins),
case NotFound of
[] -> ok;
NotFound -> emqx_logger:error("[Plugins] Cannot find plugins: ~p", [NotFound])
NotFound -> ?LOG(alert, "[Plugins] Cannot find plugins: ~p", [NotFound])
end,
NeedToLoad = Names -- NotFound -- names(started_app),
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
@ -163,12 +164,12 @@ plugin(AppName) ->
load(PluginName) when is_atom(PluginName) ->
case lists:member(PluginName, names(started_app)) of
true ->
emqx_logger:error("[Plugins] Plugin ~s is already started", [PluginName]),
?LOG(notice, "[Plugins] Plugin ~s is already started", [PluginName]),
{error, already_started};
false ->
case find_plugin(PluginName) of
false ->
emqx_logger:error("[Plugins] Plugin ~s not found", [PluginName]),
?LOG(alert, "[Plugins] Plugin ~s not found", [PluginName]),
{error, not_found};
Plugin ->
load_plugin(Plugin, true)
@ -196,12 +197,12 @@ load_app(App) ->
start_app(App, SuccFun) ->
case application:ensure_all_started(App) of
{ok, Started} ->
emqx_logger:info("Started Apps: ~p", [Started]),
emqx_logger:info("Load plugin ~s successfully", [App]),
?LOG(info, "[Plugins] Started plugins: ~p", [Started]),
?LOG(info, "[Plugins] Load plugin ~s successfully", [App]),
SuccFun(App),
{ok, Started};
{error, {ErrApp, Reason}} ->
emqx_logger:error("Load plugin ~s error, cannot start app ~s for ~p", [App, ErrApp, Reason]),
?LOG(error, "[Plugins] Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]),
{error, {ErrApp, Reason}}
end.
@ -218,10 +219,10 @@ unload(PluginName) when is_atom(PluginName) ->
{true, true} ->
unload_plugin(PluginName, true);
{false, _} ->
emqx_logger:error("Plugin ~s is not started", [PluginName]),
?LOG(error, "[Plugins] Plugin ~s is not started", [PluginName]),
{error, not_started};
{true, false} ->
emqx_logger:error("~s is not a plugin, cannot unload it", [PluginName]),
?LOG(error, "[Plugins] ~s is not a plugin, cannot unload it", [PluginName]),
{error, not_found}
end.
@ -236,11 +237,11 @@ unload_plugin(App, Persistent) ->
stop_app(App) ->
case application:stop(App) of
ok ->
emqx_logger:info("Stop plugin ~s successfully", [App]), ok;
?LOG(info, "[Plugins] Stop plugin ~s successfully", [App]), ok;
{error, {not_started, App}} ->
emqx_logger:error("Plugin ~s is not started", [App]), ok;
?LOG(error, "[Plugins] Plugin ~s is not started", [App]), ok;
{error, Reason} ->
emqx_logger:error("Stop plugin ~s error: ~p", [App]), {error, Reason}
?LOG(error, "[Plugins] Stop plugin ~s error: ~p", [App]), {error, Reason}
end.
%%--------------------------------------------------------------------
@ -268,7 +269,7 @@ plugin_loaded(Name, true) ->
ignore
end;
{error, Error} ->
emqx_logger:error("Cannot read loaded plugins: ~p", [Error])
?LOG(error, "[Plugins] Cannot read loaded plugins: ~p", [Error])
end.
plugin_unloaded(_Name, false) ->
@ -280,10 +281,10 @@ plugin_unloaded(Name, true) ->
true ->
write_loaded(lists:delete(Name, Names));
false ->
emqx_logger:error("Cannot find ~s in loaded_file", [Name])
?LOG(error, "[Plugins] Cannot find ~s in loaded_file", [Name])
end;
{error, Error} ->
emqx_logger:error("Cannot read loaded_plugins: ~p", [Error])
?LOG(error, "[Plugins] Cannot read loaded_plugins: ~p", [Error])
end.
read_loaded() ->
@ -302,7 +303,7 @@ write_loaded(AppNames) ->
file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name])))
end, AppNames);
{error, Error} ->
emqx_logger:error("Open File ~p Error: ~p", [File, Error]),
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
{error, Error}
end.

View File

@ -97,22 +97,22 @@ handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State};
handle_call(Req, _From, State) ->
?ERROR("[Pool] unexpected call: ~p", [Req]),
?LOG(error, "[Pool] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({async_submit, Task}, State) ->
try run(Task)
catch _:Error:Stacktrace ->
?ERROR("[Pool] error: ~p, ~p", [Error, Stacktrace])
?LOG(error, "[Pool] Error: ~p, ~p", [Error, Stacktrace])
end,
{noreply, State};
handle_cast(Msg, State) ->
?ERROR("[Pool] unexpected cast: ~p", [Msg]),
?LOG(error, "[Pool] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[Pool] unexpected info: ~p", [Info]),
?LOG(error, "[Pool] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -57,7 +57,6 @@
will_topic,
will_msg,
keepalive,
mountpoint,
is_bridge,
enable_ban,
enable_acl,
@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
@ -153,7 +151,6 @@ attrs(#pstate{zone = Zone,
proto_ver = ProtoVer,
proto_name = ProtoName,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_bridge = IsBridge,
connected_at = ConnectedAt,
conn_mod = ConnMod,
@ -167,7 +164,6 @@ attrs(#pstate{zone = Zone,
{proto_name, ProtoName},
{clean_start, CleanStart},
{keepalive, Keepalive},
{mountpoint, Mountpoint},
{is_bridge, IsBridge},
{connected_at, ConnectedAt},
{conn_mod, ConnMod},
@ -202,16 +198,27 @@ caps(#pstate{zone = Zone}) ->
client_id(#pstate{client_id = ClientId}) ->
ClientId.
credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
Credentials;
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername}) ->
#{zone => Zone,
client_id => ClientId,
username => Username,
peername => Peername}.
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
peername = Peername,
peercert = Peercert}) ->
with_cert(#{zone => Zone,
client_id => ClientId,
username => Username,
peername => Peername,
mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
with_cert(Credentials, undefined) -> Credentials;
with_cert(Credentials, Peercert) ->
Credentials#{dn => esockd_peercert:subject(Peercert),
cn => esockd_peercert:common_name(Peercert)}.
keepsafety(Credentials) ->
maps:filter(fun(password, _) -> false;
(dn, _) -> false;
(cn, _) -> false;
(_, _) -> true end, Credentials).
stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
send_stats = #{pkt := SendPkt, msg := SendMsg}}) ->
@ -389,7 +396,7 @@ process(?CONNECT_PACKET(
case try_open_session(SessAttrs, PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true,
credentials = maps:remove(password, Credentials0)},
credentials = keepsafety(Credentials0)},
ok = emqx_cm:register_connection(client_id(PState4)),
true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
%% Start keepalive
@ -397,11 +404,11 @@ process(?CONNECT_PACKET(
%% Success
{?RC_SUCCESS, SP, PState4};
{error, Error} ->
?LOG(error, "Failed to open session: ~p", [Error]),
?LOG(error, "[Protocol] Failed to open session: ~p", [Error]),
{?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
end;
{error, Reason} ->
?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
{emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
end;
{error, ReasonCode} ->
@ -413,7 +420,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
do_acl_deny_action(Packet, ReasonCode, PState)
end;
@ -423,7 +430,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({puback, PacketId, ReasonCode}, PState) of
{ok, PState1} ->
@ -437,7 +444,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({pubrec, PacketId, ReasonCode}, PState) of
{ok, PState1} ->
@ -469,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint,
proto_ver = ProtoVer, is_bridge = IsBridge,
ignore_loop = IgnoreLoop}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
true ->
RawTopicFilters
end,
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
PState = #pstate{session = SPid, credentials = Credentials}) ->
case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
{ok, TopicFilters} ->
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0),
ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1),
{ok, PState};
{error, TopicFilters} ->
{SubTopics, ReasonCodes} =
@ -495,7 +490,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
({Topic, #{rc := Code}}, {Topics, Codes}) ->
{[Topic|Topics], [Code|Codes]}
end, {[], []}, TopicFilters),
?LOG(warning, "Cannot subscribe ~p for ~p",
?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p",
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
case deliver({suback, PacketId, ReasonCodes}, PState) of
{ok, PState1} ->
@ -506,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
end;
process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
PState = #pstate{session = SPid, credentials = Credentials}) ->
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)),
{ok, PState};
process(?PACKET(?PINGREQ), PState) ->
@ -538,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) ->
%% ConnAck --> Client
%%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) ->
ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) ->
ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) ->
ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]),
[ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
_ = deliver({connack, ReasonCode1}, PState),
{error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
@ -553,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
%%------------------------------------------------------------------------------
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
Msg = emqx_mountpoint:mount(MountPoint,
emqx_packet:to_message(credentials(PState), Packet)),
PState = #pstate{session = SPid, credentials = Credentials}) ->
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
emqx_packet:to_message(Credentials, Packet)),
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
%%------------------------------------------------------------------------------
@ -651,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) ->
Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg),
Msg1 = emqx_message:update_expiry(Msg0),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);
deliver({puback, PacketId, ReasonCode}, PState) ->
@ -818,11 +813,11 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
ok;
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
allow -> ok;
deny ->
?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]),
?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
{error, ?RC_NOT_AUTHORIZED}
end.
@ -838,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
ok;
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
case emqx_access_control:check_acl(Credentials, publish, Topic) of
allow -> ok;
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
@ -865,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
{ok, TopicFilters};
check_sub_acl(TopicFilters, PState) ->
check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
lists:foldr(
fun({Topic, SubOpts}, {Ok, Acc}) ->
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
case emqx_access_control:check_acl(Credentials, publish, Topic) of
allow -> {Ok, [{Topic, SubOpts}|Acc]};
deny ->
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
@ -876,9 +871,9 @@ check_sub_acl(TopicFilters, PState) ->
end, {ok, []}, TopicFilters).
trace(recv, Packet) ->
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]);
?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]);
trace(send, Packet) ->
?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]).
?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]).
inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) ->
PState#pstate{recv_stats = inc_stats(Type, Stats)};
@ -900,9 +895,10 @@ terminate(conflict, _PState) ->
ok;
terminate(discard, _PState) ->
ok;
terminate(Reason, PState) ->
?LOG(info, "Shutdown for ~p", [Reason]),
ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
terminate(Reason, #pstate{credentials = Credentials}) ->
?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
start_keepalive(0, _PState) ->
ignore;
@ -920,14 +916,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
lists:map(fun emqx_topic:parse/1, RawTopicFilters).
%%------------------------------------------------------------------------------
%% Update mountpoint
update_mountpoint(PState = #pstate{mountpoint = undefined}) ->
PState;
update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}.
sp(true) -> 1;
sp(false) -> 0.
@ -974,3 +962,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
undefined;
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
raw_topic_filters(#pstate{proto_ver = ProtoVer,
is_bridge = IsBridge,
ignore_loop = IgnoreLoop}, RawTopicFilters) ->
case ProtoVer < ?MQTT_PROTO_V5 of
true ->
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
false ->
RawTopicFilters
end.
mountpoint(Credentials) ->
maps:get(mountpoint, Credentials, undefined).

View File

@ -29,10 +29,10 @@ lookup(psk, ClientPSKID, _UserState) ->
try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of
SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret};
Error ->
?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
?LOG(error, "[PSK] Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
error
catch
Except:Error:Stacktrace ->
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
?LOG(error, "[PSK] Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
error
end.

View File

@ -198,15 +198,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
?ERROR("[Router] unexpected call: ~p", [Req]),
?LOG(error, "[Router] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[Router] unexpected cast: ~p", [Msg]),
?LOG(error, "[Router] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[Router] unexpected info: ~p", [Info]),
?LOG(error, "[Router] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -103,11 +103,11 @@ init([]) ->
{ok, #{nodes => Nodes}, hibernate}.
handle_call(Req, _From, State) ->
?ERROR("[RouterHelper] unexpected call: ~p", [Req]),
?LOG(error, "[Router Helper] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[RouterHelper] unexpected cast: ~p", [Msg]),
?LOG(error, "[Router Helper] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
@ -123,7 +123,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
{noreply, State};
handle_info({mnesia_table_event, Event}, State) ->
?ERROR("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
?LOG(error, "[Router Helper] Unexpected mnesia_table_event: ~p", [Event]),
{noreply, State};
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
@ -141,7 +141,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
?ERROR("[RouteHelper] unexpected info: ~p", [Info]),
?LOG(error, "[Route Helper] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -422,11 +422,11 @@ handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid]),
?LOG(warning, "[Session] Discarded by ~p", [ByPid]),
{stop, {shutdown, discarded}, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]),
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State};
@ -445,7 +445,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
{ok, ensure_stats_timer(ensure_await_rel_timer(State1))}
end;
true ->
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
@ -457,7 +457,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
true ->
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
false ->
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]),
?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -469,7 +469,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{_Ts, AwaitingRel1} ->
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
error ->
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -478,7 +478,7 @@ handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Session] unexpected call: ~p", [Req]),
?LOG(error, "[Session] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
%% SUBSCRIBE:
@ -519,7 +519,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
true ->
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/puback/missed'),
State
end);
@ -531,7 +531,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
true ->
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
false ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
State
end);
@ -549,14 +549,14 @@ handle_cast({resume, #{conn_pid := ConnPid,
expiry_timer = ExpireTimer,
will_delay_timer = WillDelayTimer}) ->
?LOG(info, "Resumed by connection ~p ", [ConnPid]),
?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]),
%% Cancel Timers
lists:foreach(fun emqx_misc:cancel_timer/1,
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
case kick(ClientId, OldConnPid, ConnPid) of
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]);
ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]);
ignore -> ok
end,
@ -587,7 +587,7 @@ handle_cast({update_expiry_interval, Interval}, State) ->
{noreply, State#state{expiry_interval = Interval}};
handle_cast(Msg, State) ->
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
?LOG(error, "[Session] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
@ -623,12 +623,12 @@ handle_info({timeout, Timer, emit_stats},
GcState1 = emqx_gc:reset(GcState),
{noreply, NewState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason]),
?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now.", []),
?LOG(info, "[Session] Expired, shutdown now.", []),
shutdown(expired, State);
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
@ -663,12 +663,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
{noreply, State#state{old_conn_pid = undefined}};
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
[ConnPid, Pid, Reason]),
{noreply, State};
handle_info(Info, State) ->
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
?LOG(error, "[Session] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(Reason, #state{will_msg = WillMsg,
@ -792,7 +792,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
emqx_metrics:trans(inc, 'messages/qos2/expired'),
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
Age ->
ensure_await_rel_timer(Timeout - max(0, Age), State)
@ -839,8 +839,14 @@ drain_m(Cnt, Msgs) when Cnt =< 0 ->
lists:reverse(Msgs);
drain_m(Cnt, Msgs) ->
receive
{dispatch, Topic, Msg} ->
drain_m(Cnt-1, [{Topic, Msg}|Msgs])
{dispatch, Topic, Msg} when is_record(Msg, message)->
drain_m(Cnt-1, [{Topic, Msg} | Msgs]);
{dispatch, Topic, InMsgs} when is_list(InMsgs) ->
Msgs1 = lists:foldl(
fun(Msg, Acc) ->
[{Topic, Msg} | Acc]
end, Msgs, InMsgs),
drain_m(Cnt-length(InMsgs), Msgs1)
after 0 ->
lists:reverse(Msgs)
end.
@ -990,7 +996,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]),
State
end;
@ -1000,10 +1006,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]),
State;
none ->
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]),
?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]),
State
end;

View File

@ -92,7 +92,7 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From,
reply({error, Reason}, State)
catch
_:Error:Stk ->
?ERROR("Failed to start session ~p: ~p, stacktrace:~n~p",
?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p",
[ClientId, Error, Stk]),
reply({error, Error}, State)
end;
@ -101,11 +101,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) ->
{reply, maps:size(SessMap), State};
handle_call(Req, _From, State) ->
?ERROR("unexpected call: ~p", [Req]),
?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("unexpected cast: ~p", [Msg]),
?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) ->
@ -117,7 +117,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow
{noreply, State#state{sessions = SessMap1}};
handle_info(Info, State) ->
?ERROR("unexpected info: ~p", [Info]),
?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State) ->

View File

@ -310,11 +310,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
?ERROR("[SharedSub] unexpected call: ~p", [Req]),
?LOG(error, "[Shared Sub] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[SharedSub] unexpected cast: ~p", [Msg]),
?LOG(error, "[Shared Sub] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
@ -329,12 +329,12 @@ handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
?INFO("[SharedSub] shared subscriber down: ~p", [SubPid]),
?LOG(info, "[Shared Sub] Shared subscriber down: ~p", [SubPid]),
cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(Info, State) ->
?ERROR("[SharedSub] unexpected info: ~p", [Info]),
?LOG(error, "[Shared Sub] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -114,7 +114,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
try emqx_session:discard(SessPid, ConnPid)
catch
_:Error:_Stk ->
?ERROR("[SM] Failed to discard ~p: ~p", [SessPid, Error])
?LOG(warning, "[SM] Failed to discard ~p: ~p", [SessPid, Error])
end
end, lookup_session_pids(ClientId)).
@ -128,7 +128,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
{ok, SessPid};
SessPids ->
[SessPid|StalePids] = lists:reverse(SessPids),
?ERROR("[SM] More than one session found: ~p", [SessPids]),
?LOG(error, "[SM] More than one session found: ~p", [SessPids]),
lists:foreach(fun(StalePid) ->
catch emqx_session:discard(StalePid, ConnPid)
end, StalePids),
@ -250,15 +250,15 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
?ERROR("[SM] unexpected call: ~p", [Req]),
?LOG(error, "[SM] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[SM] unexpected cast: ~p", [Msg]),
?LOG(error, "[SM] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?ERROR("[SM] unexpected info: ~p", [Info]),
?LOG(error, "[SM] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -96,11 +96,11 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
?ERROR("[Registry] unexpected call: ~p", [Req]),
?LOG(error, "[Registry] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[Registry] unexpected cast: ~p", [Msg]),
?LOG(error, "[Registry] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
@ -114,7 +114,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
?ERROR("[Registry] unexpected info: ~p", [Info]),
?LOG(error, "[Registry] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -181,7 +181,7 @@ start_timer(#state{tick_ms = Ms} = State) ->
handle_call(stop, _From, State) ->
{stop, normal, _Reply = ok, State};
handle_call(Req, _From, State) ->
?ERROR("[Stats] unexpected call: ~p", [Req]),
?LOG(error, "[Stats] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
@ -199,7 +199,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) ->
case lists:keyfind(Name, #update.name, Updates) of
#update{} ->
?ERROR("[Stats]: duplicated update: ~s", [Name]),
?LOG(warning, "[Stats] Duplicated update: ~s", [Name]),
{noreply, State};
false ->
{noreply, State#state{updates = [Update | Updates]}}
@ -209,7 +209,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
{noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}};
handle_cast(Msg, State) ->
?ERROR("[Stats] unexpected cast: ~p", [Msg]),
?LOG(error, "[Stats] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
@ -219,7 +219,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
try UpFun()
catch
_:Error ->
?ERROR("[Stats] update ~s error: ~p", [Name, Error])
?LOG(error, "[Stats] update ~s failed: ~p", [Name, Error])
end,
[Update#update{countdown = I} | Acc];
(Update = #update{countdown = C}, Acc) ->
@ -228,7 +228,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
handle_info(Info, State) ->
?ERROR("[Stats] unexpected info: ~p", [Info]),
?LOG("error, [Stats] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->

View File

@ -117,11 +117,11 @@ handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(Req, _From, State) ->
?ERROR("[SYS] unexpected call: ~p", [Req]),
?LOG(error, "[SYS] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[SYS] unexpected cast: ~p", [Msg]),
?LOG(error, "[SYS] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
@ -138,7 +138,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi
{noreply, tick(State), hibernate};
handle_info(Info, State) ->
?ERROR("[SYS] unexpected info: ~p", [Info]),
?LOG(error, "[SYS] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->

View File

@ -88,18 +88,18 @@ parse_opt([_Opt|Opts], Acc) ->
parse_opt(Opts, Acc).
handle_call(Req, _From, State) ->
?ERROR("[SYSMON] unexpected call: ~p", [Req]),
?LOG(error, "[SYSMON] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR("[SYSMON] unexpected cast: ~p", [Msg]),
?LOG(error, "[SYSMON] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid},
fun() ->
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_gc, WarnMsg)
end, State);
@ -107,7 +107,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
suppress({long_schedule, Pid},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -115,7 +115,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -123,7 +123,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid},
fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(large_heap, WarnMsg)
end, State);
@ -131,7 +131,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_port, WarnMsg)
end, State);
@ -139,7 +139,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_dist_port, WarnMsg)
end, State);
@ -147,7 +147,7 @@ handle_info({timeout, _Ref, reset}, State) ->
{noreply, State#{events := []}, hibernate};
handle_info(Info, State) ->
?ERROR("[SYSMON] unexpected Info: ~p", [Info]),
?LOG(error, "[SYSMON] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
%% APIs
-export([start_link/0]).
@ -110,10 +111,10 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who} }]}) of
ok ->
emqx_logger:info("[Tracer] start trace for ~p", [Who]),
?LOG(info, "[Tracer] Start trace for ~p", [Who]),
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
{error, Reason} ->
emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]),
?LOG(error, "[Tracer] Start trace for ~p failed, error: ~p", [Who, Reason]),
{reply, {error, Reason}, State}
end;
@ -122,9 +123,9 @@ handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
{ok, _LogFile} ->
case logger:remove_handler(handler_id(Who)) of
ok ->
emqx_logger:info("[Tracer] stop trace for ~p", [Who]);
?LOG(info, "[Tracer] Stop trace for ~p", [Who]);
{error, Reason} ->
emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason])
?LOG(error, "[Tracer] Stop trace for ~p failed, error: ~p", [Who, Reason])
end,
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
error ->
@ -135,15 +136,15 @@ handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Tracer] unexpected call: ~p", [Req]),
?LOG(error, "[Tracer] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[Tracer] unexpected cast: ~p", [Msg]),
?LOG(error, "[Tracer] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Tracer] unexpected info: ~p", [Info]),
?LOG(error, "[Tracer] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -109,7 +109,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
true -> alarm_handler:clear_alarm(too_many_processes);
false -> ok
end,
{noreply, ensure_check_timer(State#{is_process_alarm_set := false})}
{noreply, ensure_check_timer(State#{is_process_alarm_set := false})};
_Precent ->
{noreply, ensure_check_timer(State)}
end.
terminate(_Reason, #{timer := Timer}) ->

View File

@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) ->
%%------------------------------------------------------------------------------
init(Req, Opts) ->
IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000),
DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
0 -> infinity;
MFS -> MFS
end,
Compress = proplists:get_value(compress, Opts, false),
Options = #{compress => Compress,
deflate_opts => DeflateOptions,
max_frame_size => MaxFrameSize,
idle_timeout => IdleTimeout},
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
undefined ->
{cowboy_websocket, Req, #state{}};
{cowboy_websocket, Req, #state{}, Options};
[<<"mqtt", Vsn/binary>>] ->
Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
{cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}};
{cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options};
_ ->
{ok, cowboy_req:reply(400, Req), #state{}}
end.
@ -163,7 +174,7 @@ websocket_handle({binary, [<<>>]}, State) ->
{ok, ensure_stats_timer(State)};
websocket_handle({binary, Data}, State = #state{parse_state = ParseState,
proto_state = ProtoState}) ->
?LOG(debug, "RECV ~p", [Data]),
?LOG(debug, "[WS Connection] RECV ~p", [Data]),
BinSize = iolist_size(Data),
emqx_pd:update_counter(recv_oct, BinSize),
emqx_metrics:trans(inc, 'bytes/received', BinSize),
@ -177,7 +188,7 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState,
{ok, ProtoState1} ->
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
{error, Error} ->
?LOG(error, "Protocol error - ~p", [Error]),
?LOG(error, "[WS Connection] Protocol error: ~p", [Error]),
shutdown(Error, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1});
@ -185,11 +196,11 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState,
shutdown(Error, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
?LOG(error, "Frame error: ~p", [Error]),
?LOG(error, "[WS Connection] Frame error: ~p", [Error]),
shutdown(Error, State)
catch
_:Error ->
?LOG(error, "Frame error:~p~nFrame data: ~p", [Error, Data]),
?LOG(error, "[WS Connection] Frame error:~p~nFrame data: ~p", [Error, Data]),
shutdown(parse_error, State)
end;
%% Pings should be replied with pongs, cowboy does it automatically
@ -236,12 +247,12 @@ websocket_info({timeout, Timer, emit_stats},
{ok, State#state{stats_timer = undefined}, hibernate};
websocket_info({keepalive, start, Interval}, State) ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
?LOG(debug, "[WS Connection] Keepalive at the interval of ~p", [Interval]),
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
{ok, KeepAlive} ->
{ok, State#state{keepalive = KeepAlive}};
{error, Error} ->
?LOG(warning, "Keepalive error - ~p", [Error]),
?LOG(warning, "[WS Connection] Keepalive error: ~p", [Error]),
shutdown(Error, State)
end;
@ -250,19 +261,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
{ok, KeepAlive1} ->
{ok, State#state{keepalive = KeepAlive1}};
{error, timeout} ->
?LOG(debug, "Keepalive Timeout!"),
?LOG(debug, "[WS Connection] Keepalive Timeout!"),
shutdown(keepalive_timeout, State);
{error, Error} ->
?LOG(error, "Keepalive error - ~p", [Error]),
?LOG(error, "[WS Connection] Keepalive error: ~p", [Error]),
shutdown(keepalive_error, State)
end;
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
?LOG(warning, "[WS Connection] Discarded by ~s:~p", [ClientId, ByPid]),
shutdown(discard, State);
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
?LOG(warning, "[WS Connection] Clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State);
websocket_info({binary, Data}, State) ->
@ -272,14 +283,14 @@ websocket_info({shutdown, Reason}, State) ->
shutdown(Reason, State);
websocket_info(Info, State) ->
?LOG(error, "unexpected info: ~p", [Info]),
?LOG(error, "[WS Connection] Unexpected info: ~p", [Info]),
{ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, sockerror: ~p", [Shutdown, SockError]),
?LOG(debug, "[WS Connection] Terminated for ~p, sockerror: ~p", [Shutdown, SockError]),
emqx_keepalive:cancel(Keepalive),
case {ProtoState, Shutdown} of
{undefined, _} -> ok;
@ -308,4 +319,3 @@ shutdown(Reason, State) ->
wsock_stats() ->
[{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].

View File

@ -92,7 +92,7 @@ handle_call(force_reload, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
?ERROR("[Zone] unexpected call: ~p", [Req]),
?LOG(error, "[Zone] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({set_env, Zone, Key, Val}, State) ->
@ -100,7 +100,7 @@ handle_cast({set_env, Zone, Key, Val}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?ERROR("[Zone] unexpected cast: ~p", [Msg]),
?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(reload, State) ->
@ -108,7 +108,7 @@ handle_info(reload, State) ->
{noreply, ensure_reload_timer(State#{timer := undefined}), hibernate};
handle_info(Info, State) ->
?ERROR("[Zone] unexpected info: ~p", [Info]),
?LOG(error, "[Zone] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->