Merge pull request #2649 from emqx/master

Auto-pull-request-by-2019-06-22
This commit is contained in:
turtleDeng 2019-06-22 09:51:46 +08:00 committed by GitHub
commit 872c0af3fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 458 additions and 293 deletions

1
.gitignore vendored
View File

@ -40,3 +40,4 @@ xrefr
erlang.mk
*.coverdata
etc/emqx.conf.rendered
Mnesia.*/

View File

@ -3,20 +3,44 @@
REBAR_GIT_CLONE_OPTIONS += --depth 1
export REBAR_GIT_CLONE_OPTIONS
# CT_SUITES = emqx_trie emqx_router emqx_frame emqx_mqtt_compat
SUITES_FILES := $(shell find test -name '*_SUITE.erl')
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_channel \
emqx_packet emqx_channel emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
emqx_vm_mon emqx_alarm_handler emqx_rpc emqx_flapping
CT_SUITES := $(foreach value,$(SUITES_FILES),$(shell val=$$(basename $(value) .erl); echo $${val%_*}))
CT_NODE_NAME = emqxct@127.0.0.1
RUN_NODE_NAME = emqxdebug@127.0.0.1
.PHONY: run
run: run_setup
@rebar3 as test get-deps
@rebar3 as test auto --name $(RUN_NODE_NAME) --script test/run_emqx.escript
.PHONY: run_setup
run_setup:
@erl -noshell -eval \
"{ok, [[HOME]]} = init:get_argument(home), \
FilePath = HOME ++ \"/.config/rebar3/rebar.config\", \
case file:consult(FilePath) of \
{ok, Term} -> \
NewTerm = case lists:keyfind(plugins, 1, Term) of \
false -> [{plugins, [rebar3_auto]} | Term]; \
{plugins, OldPlugins} -> \
NewPlugins0 = OldPlugins -- [rebar3_auto], \
NewPlugins = [rebar3_auto | NewPlugins0], \
lists:keyreplace(plugins, 1, Term, {plugins, NewPlugins}) \
end, \
ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]); \
_ -> \
NewTerm=[{plugins, [rebar3_auto]}], \
ok = file:write_file(FilePath, [io_lib:format(\"~p.\n\", [I]) || I <- NewTerm]) \
end, \
halt(0)."
.PHONY: shell
shell:
@rebar3 as test auto
compile:
@rebar3 compile
@ -92,5 +116,6 @@ gen-clean:
.PHONY: distclean
distclean: gen-clean
@rm -rf Mnesia.*
@rm -rf _build cover deps logs log data
@rm -f rebar.lock compile_commands.json cuttlefish erl_crash.dump

View File

@ -132,7 +132,8 @@
descr :: string(),
vendor :: string(),
active = false :: boolean(),
info :: map()
info :: map(),
type :: atom()
}).
%%--------------------------------------------------------------------

View File

@ -14,6 +14,8 @@
%% debug | info | notice | warning | error | critical | alert | emergency
-compile({parse_transform, emqx_logger}).
-define(DEBUG(Format), ?LOG(debug, Format, [])).
-define(DEBUG(Format, Args), ?LOG(debug, Format, Args)).
@ -39,5 +41,5 @@
-define(LOG(Level, Format, Args),
begin
(logger:log(Level,#{},#{report_cb => fun(_) -> {(Format), (Args)} end}))
(logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end}))
end).

View File

@ -1,12 +1,12 @@
{deps,
[ {jsx, "2.9.0"} % hex
, {cowboy, "2.6.1"} % hex
, {gproc, "0.8.0"} % hex
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.5"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.1.1"}}}
, {esockd, "5.5.0"}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
[{jsx, "2.9.0"}, % hex
{cowboy, "2.6.1"}, % hex
{gproc, "0.8.0"}, % hex
{ekka, "0.5.6"}, % hex
{replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
@ -24,12 +24,14 @@
{plugins, [coveralls]}.
{erl_first_files, ["src/emqx_logger.erl"]}.
{profiles,
[{test,
[{deps,
[ {meck, "0.8.13"} % hex
, {bbmustache, "1.7.0"} % hex
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}}
[{meck, "0.8.13"}, % hex
{bbmustache, "1.7.0"}, % hex
{emqx_ct_helpers, "1.1.3"} % hex
]}
]}
]}.

View File

@ -18,6 +18,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[EMQ X]").
%% Start/Stop the application
-export([ start/0
, restart/1
@ -183,7 +185,7 @@ shutdown() ->
shutdown(normal).
shutdown(Reason) ->
?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]),
?LOG(critical, "emqx shutdown for ~s", [Reason]),
emqx_alarm_handler:unload(),
emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).

View File

@ -29,7 +29,10 @@
-> {ok, emqx_types:credentials()} | {error, term()}).
authenticate(Credentials) ->
case emqx_hooks:run_fold('client.authenticate', [], init_auth_result(Credentials)) of
#{auth_result := success} = NewCredentials ->
#{auth_result := success, anonymous := true} = NewCredentials ->
emqx_metrics:inc('auth.mqtt.anonymous'),
{ok, NewCredentials};
#{auth_result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(auth_result, NewCredentials, unknown_error)}

View File

@ -19,6 +19,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Alarm Handler]").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -93,18 +95,23 @@ init(_) ->
handle_event({set_alarm, {AlarmId, AlarmDesc = #alarm{timestamp = undefined}}}, State) ->
handle_event({set_alarm, {AlarmId, AlarmDesc#alarm{timestamp = os:timestamp()}}}, State);
handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
?LOG(warning, "[Alarm Handler] ~p set", [Alarm]),
?LOG(warning, "~p set", [Alarm]),
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(alert, maybe_to_binary(AlarmId)), Json));
emqx_broker:safe_publish(alarm_msg(topic(alert), Json));
{error, Reason} ->
?LOG(error, "[Alarm Handler] Failed to encode alarm: ~p", [Reason])
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
set_alarm_(AlarmId, AlarmDesc),
{ok, State};
handle_event({clear_alarm, AlarmId}, State) ->
?LOG(notice, "[Alarm Handler] ~p clear", [AlarmId]),
emqx_broker:safe_publish(alarm_msg(topic(clear, maybe_to_binary(AlarmId)), <<"">>)),
?LOG(notice, "~p clear", [AlarmId]),
case encode_alarm({AlarmId, undefined}) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
{error, Reason} ->
?LOG(error, "Failed to encode alarm: ~p", [Reason])
end,
clear_alarm_(AlarmId),
{ok, State};
handle_event(_, State) ->
@ -140,19 +147,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
{title, iolist_to_binary(Title)},
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(Ts)}]}]);
encode_alarm({AlarmId, undefined}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]);
encode_alarm({AlarmId, AlarmDesc}) ->
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{desc, maybe_to_binary(AlarmDesc)}]).
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
{description, maybe_to_binary(AlarmDesc)}]).
alarm_msg(Topic, Payload) ->
Msg = emqx_message:make(?MODULE, Topic, Payload),
emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
emqx_message:set_flag(sys, Msg)).
topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
topic(clear, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
topic(alert) ->
emqx_topic:systop(<<"alarms/alert">>);
topic(clear) ->
emqx_topic:systop(<<"alarms/clear">>).
maybe_to_binary(Data) when is_binary(Data) ->
Data;

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Banned]").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -88,11 +90,11 @@ init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
handle_call(Req, _From, State) ->
?LOG(error, "[Banned] unexpected call: ~p", [Req]),
?LOG(error, "unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Banned] unexpected msg: ~p", [Msg]),
?LOG(error, "unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
@ -100,7 +102,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
{noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) ->
?LOG(error, "[Banned] unexpected info: ~p", [Info]),
?LOG(error, "unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{expiry_timer := TRef}) ->

View File

@ -113,6 +113,8 @@
-include("logger.hrl").
-include("emqx_mqtt.hrl").
-logger_header("[Bridge]").
%% same as default in-flight limit for emqx_client
-define(DEFAULT_BATCH_COUNT, 32).
-define(DEFAULT_BATCH_BYTES, 1 bsl 20).
@ -304,7 +306,7 @@ standing_by({call, From}, ensure_started, State) ->
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by(info, Info, State) ->
?LOG(info, "[Bridge] Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State};
standing_by(Type, Content, State) ->
common(standing_by, Type, Content, State).
@ -360,7 +362,7 @@ connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref => undefined, connection => undefined}};
false ->
@ -372,7 +374,7 @@ connected(info, {batch_ack, Ref}, State) ->
keep_state_and_data;
bad_order ->
%% try re-connect then re-send
?LOG(error, "[Bridge] Bad order ack received by bridge ~p", [name()]),
?LOG(error, "Bad order ack received by bridge ~p", [name()]),
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState, ?maybe_send}
@ -403,7 +405,7 @@ common(_StateName, info, {dispatch, _, Msg},
NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) ->
?LOG(notice, "[Bridge] Bridge ~p discarded ~p type event at state ~p:\n~p",
?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:\n~p",
[name(), Type, StateName, Content]),
{keep_state, State}.
@ -459,7 +461,7 @@ do_connect(Type, StateName, #{ forwards := Forwards
end,
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
?LOG(info, "Bridge ~p connected", [name()]),
State0 = State#{conn_ref => ConnRef, connection => Conn},
State1 = eval_bridge_handler(State0, connected),
StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
@ -515,7 +517,7 @@ retry_inflight(#{inflight := Inflight} = State,
{ok, NewState} ->
retry_inflight(NewState, T);
{error, Reason} ->
?LOG(error, "[Bridge] Inflight retry failed\n~p", [Reason]),
?LOG(error, "Inflight retry failed\n~p", [Reason]),
{error, State#{inflight := Inflight ++ Remain}}
end.
@ -546,7 +548,7 @@ do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
batch => Batch}],
{ok, State#{inflight := NewInflight}};
{error, Reason} ->
?LOG(info, "[Bridge] Batch produce failed\n~p", [Reason]),
?LOG(info, "Batch produce failed\n~p", [Reason]),
{error, State}
end.

View File

@ -31,6 +31,8 @@
-include("logger.hrl").
-logger_header("[Bridge Connect]").
%% establish the connection to remote node/cluster
%% protal worker (the caller process) should be expecting
%% a message {disconnected, conn_ref()} when disconnected.
@ -54,7 +56,7 @@ start(Module, Config) ->
{ok, Ref, Conn};
{error, Reason} ->
Config1 = obfuscate(Config),
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
?LOG(error, "Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
{error, Reason}
end.

View File

@ -17,6 +17,8 @@
-include("logger.hrl").
-logger_header("[Bridge]").
%% APIs
-export([ start_link/0
, start_link/1
@ -74,6 +76,6 @@ drop_bridge(Id) ->
ok ->
supervisor:delete_child(?SUP, Id);
Error ->
?LOG(error, "[Bridge] Delete bridge failed, error : ~p", [Error]),
?LOG(error, "Delete bridge failed, error : ~p", [Error]),
Error
end.

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Broker]").
-export([start_link/2]).
%% PubSub
@ -195,7 +197,7 @@ publish(Msg) when is_record(Msg, message) ->
Headers = Msg#message.headers,
case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
#message{headers = #{allow_publish := false}} ->
?LOG(notice, "[Broker] Publishing interrupted: ~s", [emqx_message:format(Msg)]),
?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
@ -209,7 +211,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
publish(Msg)
catch
_:Error:Stacktrace ->
?LOG(error, "[Broker] Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
?LOG(error, "Publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
after
ok
end.
@ -256,7 +258,7 @@ forward(Node, To, Delivery) ->
%% rpc:call to ensure the delivery, but the latency:(
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?LOG(error, "[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
end.
@ -424,14 +426,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Broker] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} ->
?LOG(error, "[Broker] Failed to add route: ~p", [Reason])
?LOG(error, "Failed to add route: ~p", [Reason])
end,
{noreply, State};
@ -454,11 +456,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "[Broker] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Broker] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -19,6 +19,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Broker Helper]").
-export([start_link/0]).
%% APIs
@ -110,7 +112,7 @@ init([]) ->
{ok, #{pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
?LOG(error, "[Broker Helper] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
@ -119,7 +121,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
handle_cast(Msg, State) ->
?LOG(error, "[Broker Helper] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
@ -130,7 +132,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
{noreply, State#{pmon := PMon1}};
handle_info(Info, State) ->
?LOG(error, "[Broker Helper] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -22,6 +22,8 @@
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-logger_header("[Channel]").
-export([start_link/3]).
%% APIs
@ -288,18 +290,18 @@ handle({call, From}, session, State = #state{proto_state = ProtoState}) ->
reply(From, emqx_protocol:session(ProtoState), State);
handle({call, From}, Req, State) ->
?LOG(error, "[Channel] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
reply(From, ignored, State);
%% Handle cast
handle(cast, Msg, State) ->
?LOG(error, "[Channel] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{keep_state, State};
%% Handle Incoming
handle(info, {Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
Oct = iolist_size(Data),
?LOG(debug, "[Channel] RECV ~p", [Data]),
?LOG(debug, "RECV ~p", [Data]),
emqx_pd:update_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
NState = ensure_stats_timer(maybe_gc({1, Oct}, State)),
@ -348,23 +350,23 @@ handle(info, {timeout, Timer, emit_stats},
GcState1 = emqx_gc:reset(GcState),
{keep_state, NState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} ->
?LOG(error, "[Channel] Shutdown exceptionally due to ~p", [Reason]),
?LOG(error, "Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NState)
end;
handle(info, {shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(error, "[Channel] Discarded by ~s:~p", [ClientId, ByPid]),
?LOG(error, "Discarded by ~s:~p", [ClientId, ByPid]),
shutdown(discard, State);
handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "[Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]),
?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State);
handle(info, {shutdown, Reason}, State) ->
shutdown(Reason, State);
handle(info, Info, State) ->
?LOG(error, "[Channel] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{keep_state, State}.
code_change(_Vsn, State, Data, _Extra) ->
@ -374,7 +376,7 @@ terminate(Reason, _StateName, #state{transport = Transport,
socket = Socket,
keepalive = KeepAlive,
proto_state = ProtoState}) ->
?LOG(debug, "[Channel] Terminated for ~p", [Reason]),
?LOG(debug, "Terminated for ~p", [Reason]),
Transport:fast_close(Socket),
emqx_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of
@ -403,7 +405,7 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
shutdown(Reason, State)
catch
error:Reason:Stk ->
?LOG(error, "[Channel] Parse failed for ~p~n\
?LOG(error, "Parse failed for ~p~n\
Stacktrace:~p~nError data:~p", [Reason, Stk, Data]),
shutdown(parse_error, State)
end.
@ -445,7 +447,7 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
{0, Rl1} ->
ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
{Pause, Rl1} ->
?LOG(debug, "[Channel] Rate limit pause connection ~pms", [Pause]),
?LOG(debug, "Rate limit pause connection ~pms", [Pause]),
TRef = erlang:send_after(Pause, self(), activate_socket),
setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1)
end.

View File

@ -22,6 +22,8 @@
-include("types.hrl").
-include("emqx_client.hrl").
-logger_header("[Client]").
-export([ start_link/0
, start_link/1
]).
@ -794,10 +796,10 @@ connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
State#state{inflight = Inflight1};
{value, {pubrel, _Ref, _Ts}} ->
?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]),
?LOG(notice, "Duplicated PUBREC Packet: ~p", [PacketId]),
State;
none ->
?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]),
?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId]),
State
end);
@ -812,7 +814,7 @@ connected(cast, ?PUBREL_PACKET(PacketId),
false -> {keep_state, NewState}
end;
error ->
?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]),
?LOG(warning, "Unexpected PUBREL: ~p", [PacketId]),
keep_state_and_data
end;
@ -911,37 +913,37 @@ handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
?LOG(debug, "[Client] RECV Data: ~p", [Data]),
?LOG(debug, "RECV Data: ~p", [Data]),
process_incoming(Data, [], run_sock(State));
handle_event(info, {Error, _Sock, Reason}, _StateName, State)
when Error =:= tcp_error; Error =:= ssl_error ->
?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]),
?LOG(error, "The connection error occured ~p, reason:~p", [Error, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {Closed, _Sock}, _StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
?LOG(debug, "[Client] ~p", [Closed]),
?LOG(debug, "~p", [Closed]),
{stop, {shutdown, Closed}, State};
handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]),
?LOG(debug, "Got EXIT from owner, Reason: ~p", [Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
keep_state_and_data;
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
?LOG(error, "Got tcp error: ~p", [Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
?LOG(info, "State: ~s, Unexpected Event: (info, ~p)",
[StateName, EventContent]),
keep_state_and_data;
handle_event(EventType, EventContent, StateName, _StateData) ->
?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
?LOG(error, "State: ~s, Unexpected Event: (~p, ~p)",
[StateName, EventType, EventContent]),
keep_state_and_data.
@ -984,7 +986,7 @@ delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]),
?LOG(warning, "Unexpected PUBACK: ~p", [PacketId]),
State
end;
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
@ -996,7 +998,7 @@ delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
properties => Properties}),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]),
?LOG(warning, "Unexpected PUBCOMP Packet: ~p", [PacketId]),
State
end.
@ -1200,7 +1202,7 @@ send(Msg, State) when is_record(Msg, mqtt_msg) ->
send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]),
?LOG(debug, "SEND Data: ~1000p", [Packet]),
case emqx_client_sock:send(Sock, Data) of
ok -> {ok, bump_last_packet_id(State)};
Error -> Error

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[CM]").
-export([start_link/0]).
-export([ register_connection/1
@ -159,7 +161,7 @@ init([]) ->
{ok, #{conn_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
?LOG(error, "[CM] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
@ -169,7 +171,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
handle_cast(Msg, State) ->
?LOG(error, "[CM] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
@ -180,7 +182,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}
{noreply, State#{conn_pmon := PMon1}};
handle_info(Info, State) ->
?LOG(error, "[CM] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -30,9 +30,8 @@ init([]) ->
shutdown => 1000,
type => worker,
modules => [emqx_banned]},
FlappingOption = emqx_config:get_env(flapping_clean_interval, 3600000),
Flapping = #{id => flapping,
start => {emqx_flapping, start_link, [FlappingOption]},
start => {emqx_flapping, start_link, []},
restart => permanent,
shutdown => 1000,
type => worker,

View File

@ -18,6 +18,8 @@
-include("logger.hrl").
-logger_header("[Ctl]").
-export([start_link/0]).
-export([ register_command/2
@ -79,7 +81,7 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
_ -> ok
catch
_:Reason:Stacktrace ->
?ERROR("[Ctl] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
?ERROR("CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
{error, Reason}
end;
[] ->
@ -107,14 +109,14 @@ init([]) ->
{ok, #state{seq = 0}}.
handle_call(Req, _From, State) ->
?LOG(error, "[Ctl] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
[] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] ->
?LOG(warning, "[Ctl] CMD ~s is overidden by ~p", [Cmd, MF]),
?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]),
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
noreply(next_seq(State));
@ -124,11 +126,11 @@ handle_cast({unregister_command, Cmd}, State) ->
noreply(State);
handle_cast(Msg, State) ->
?LOG(error, "[Ctl] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
noreply(State).
handle_info(Info, State) ->
?LOG(error, "[Ctl] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
noreply(State).
terminate(_Reason, _State) ->

View File

@ -19,7 +19,7 @@
-behaviour(gen_statem).
-export([start_link/1]).
-export([start_link/0]).
%% This module is used to garbage clean the flapping records
@ -33,6 +33,8 @@
-define(FLAPPING_TAB, ?MODULE).
-define(default_flapping_clean_interval, 3600000).
-export([check/3]).
-record(flapping,
@ -96,11 +98,12 @@ check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval},
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
-spec(start_link(TimerInterval :: [integer()]) -> startlink_ret()).
start_link(TimerInterval) ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [TimerInterval], []).
-spec(start_link() -> startlink_ret()).
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
init([TimerInterval]) ->
init([]) ->
TimerInterval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval),
TabOpts = [ public
, set
, {keypos, 2}

View File

@ -19,6 +19,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Hooks]").
-export([start_link/0, stop/0]).
%% Hooks API
@ -181,7 +183,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat
{reply, Reply, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Hooks] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({del, HookPoint, Action}, State) ->
@ -194,11 +196,11 @@ handle_cast({del, HookPoint, Action}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "[Hooks] Unexpected msg: ~p", [Msg]),
?LOG(error, "Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Hooks] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -48,6 +48,8 @@
, get_log_handler/1
]).
-export([parse_transform/2]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -120,6 +122,9 @@ set_log_level(Level) ->
{error, Error} -> {error, {primary_logger_level, Error}}
end.
parse_transform(AST, _Opts) ->
trans(AST, "", []).
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
@ -160,3 +165,27 @@ rollback([{ID, Level} | List]) ->
rollback(List);
rollback([]) -> ok.
%% Generate a function '$logger_header'/0 into the source code
trans([], LogHeader, ResAST) ->
lists:reverse([header_fun(LogHeader) | ResAST]);
trans([{eof, L} | AST], LogHeader, ResAST) ->
lists:reverse([{eof, L}, header_fun(LogHeader) | ResAST]) ++ AST;
trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) ->
trans(AST, Header, [export_header_fun(), M | ResAST]);
trans([{attribute, _, logger_header, Header} | AST], _, ResAST) ->
io_lib:printable_list(Header) orelse error({invalid_string, Header}),
trans(AST, Header, ResAST);
trans([F | AST], LogHeader, ResAST) ->
trans(AST, LogHeader, [F | ResAST]).
export_header_fun() ->
{attribute,erl_anno:new(0),export,[{'$logger_header',0}]}.
header_fun(LogHeader) ->
L = erl_anno:new(0),
{function,L,'$logger_header',0,
[{clause,L,
[], [], [{string,L,pad(LogHeader)}]}]}.
pad("") -> "";
pad(Str) -> Str ++ " ".

View File

@ -20,6 +20,8 @@
-include("types.hrl").
-include("emqx_mqtt.hrl").
-logger_header("[Metrics]").
-export([ start_link/0
, stop/0
]).
@ -130,6 +132,10 @@
{counter, 'messages.forward'} % Messages forward
]).
-define(MQTT_METRICS, [
{counter, 'auth.mqtt.anonymous'}
]).
-record(state, {next_idx = 1}).
-record(metric, {name, type, idx}).
@ -353,17 +359,17 @@ init([]) ->
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?MQTT_METRICS),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
?LOG(error, "[Metrics] Failed to create ~s:~s for index exceeded.", [Type, Name]),
?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]),
{reply, {error, metric_index_exceeded}, State};
handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
case ets:lookup(?TAB, Name) of
[#metric{idx = Idx}] ->
?LOG(warning, "[Metrics] ~s already exists.", [Name]),
?LOG(warning, "~s already exists.", [Name]),
{reply, {ok, Idx}, State};
[] ->
Metric = #metric{name = Name, type = Type, idx = NextIdx},
@ -372,15 +378,15 @@ handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
end;
handle_call(Req, _From, State) ->
?LOG(error, "[Metrics] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Metrics] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Metrics] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
@ -444,4 +450,5 @@ reserved_idx('messages.retained') -> 48;
reserved_idx('messages.dropped') -> 49;
reserved_idx('messages.expired') -> 50;
reserved_idx('messages.forward') -> 51;
reserved_idx('auth.mqtt.anonymous') -> 52;
reserved_idx(_) -> undefined.

View File

@ -19,6 +19,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[ACL_INTERNAL]").
%% APIs
-export([ all_rules/0
, check_acl/5
@ -99,7 +101,7 @@ rules_from_file(AclFile) ->
#{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
{error, Reason} ->
?LOG(alert, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]),
#{}
end.

View File

@ -19,6 +19,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Presence]").
%% APIs
-export([ on_client_connected/4
, on_client_disconnected/3
@ -49,23 +51,23 @@ on_client_connected(#{client_id := ClientId,
username => Username,
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
connack => ConnAck,
ts => os:system_time(second)
ts => erlang:system_time(millisecond)
}) of
{ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} ->
?LOG(error, "[Presence] Encoding connected event error: ~p", [Reason])
?LOG(error, "Encoding connected event error: ~p", [Reason])
end.
on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) ->
case emqx_json:safe_encode([{clientid, ClientId},
{username, Username},
{reason, reason(Reason)},
{ts, os:system_time(second)}]) of
{ts, erlang:system_time(millisecond)}]) of
{ok, Payload} ->
emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
{error, Reason} ->
?LOG(error, "[Presence] Encoding disconnected event error: ~p", [Reason])
?LOG(error, "Encoding disconnected event error: ~p", [Reason])
end.
unload(_Env) ->

View File

@ -16,6 +16,8 @@
-include("logger.hrl").
-logger_header("[Modules]").
-export([ load/0
, unload/0
]).
@ -26,7 +28,7 @@ load() ->
lists:foreach(
fun({Mod, Env}) ->
ok = Mod:load(Env),
?LOG(info, "[Modules] Load ~s module successfully.", [Mod])
?LOG(info, "Load ~s module successfully.", [Mod])
end, emqx_config:get_env(modules, [])).
-spec(unload() -> ok).

View File

@ -17,6 +17,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Mountpoint]").
-export([ mount/2
, unmount/2
]).
@ -46,7 +48,7 @@ unmount(MountPoint, Msg = #message{topic = Topic}) ->
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
catch
_Error:Reason ->
?LOG(error, "[Mountpoint] Unmount error : ~p", [Reason]),
?LOG(error, "Unmount error : ~p", [Reason]),
Msg
end.

View File

@ -24,6 +24,8 @@
, get_caps/2
]).
-export([default_caps/0]).
-type(caps() :: #{max_packet_size => integer(),
max_clientid_len => integer(),
max_topic_alias => integer(),
@ -36,6 +38,7 @@
-export_type([caps/0]).
-define(UNLIMITED, 0).
-define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE},
{max_clientid_len, ?MAX_CLIENTID_LEN},
{max_topic_alias, ?UNLIMITED},
@ -119,6 +122,9 @@ check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) ->
_ -> check_sub(Topic, Opts, Caps)
end.
default_caps() ->
?DEFAULT_CAPS.
get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps',
fun() ->

View File

@ -18,6 +18,8 @@
-include("logger.hrl").
-logger_header("[OS Monitor]").
-export([start_link/1]).
%% gen_server callbacks
@ -45,6 +47,11 @@
-define(OS_MON, ?MODULE).
-define(compat_windows(Expression), case os:type() of
{win32, nt} -> windows;
_Unix -> Expression
end).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
@ -93,7 +100,7 @@ set_procmem_high_watermark(Float) ->
%%------------------------------------------------------------------------------
init([Opts]) ->
_ = cpu_sup:util(),
_ = ?compat_windows(cpu_sup:util()),
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts, 60)),
set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts, 0.70)),
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts, 0.05)),
@ -124,16 +131,18 @@ handle_call(_Request, _From, State) ->
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({timeout, Timer, check}, State = #{timer := Timer,
handle_info({timeout, Timer, check}, State = #{timer := Timer,
cpu_high_watermark := CPUHighWatermark,
cpu_low_watermark := CPULowWatermark,
is_cpu_alarm_set := IsCPUAlarmSet}) ->
case cpu_sup:util() of
case ?compat_windows(cpu_sup:util()) of
0 ->
{noreply, State#{timer := undefined}};
{error, Reason} ->
?LOG(error, "[OS Monitor] Failed to get cpu utilization: ~p", [Reason]),
?LOG(error, "Failed to get cpu utilization: ~p", [Reason]),
{noreply, ensure_check_timer(State)};
windows ->
{noreply, State};
Busy when Busy / 100 >= CPUHighWatermark ->
alarm_handler:set_alarm({cpu_high_watermark, Busy}),
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})};
@ -161,4 +170,3 @@ call(Req) ->
ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}.

View File

@ -17,6 +17,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Plugins]").
-export([init/0]).
-export([ load/0
@ -86,7 +88,7 @@ load_expand_plugin(PluginDir) ->
end, Modules),
case filelib:wildcard(Ebin ++ "/*.app") of
[App|_] -> application:load(list_to_atom(filename:basename(App, ".app")));
_ -> ?LOG(alert, "[Plugins] Plugin not found."),
_ -> ?LOG(alert, "Plugin not found."),
{error, load_app_fail}
end.
@ -112,7 +114,7 @@ with_loaded_file(File, SuccFun) ->
Names = filter_plugins(Names0),
SuccFun(Names);
{error, Error} ->
?LOG(alert, "[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]),
{error, Error}
end.
@ -126,7 +128,7 @@ load_plugins(Names, Persistent) ->
Plugins = list(), NotFound = Names -- names(Plugins),
case NotFound of
[] -> ok;
NotFound -> ?LOG(alert, "[Plugins] Cannot find plugins: ~p", [NotFound])
NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound])
end,
NeedToLoad = Names -- NotFound -- names(started_app),
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
@ -149,20 +151,20 @@ stop_plugins(Names) ->
-spec(list() -> [emqx_types:plugin()]).
list() ->
StartedApps = names(started_app),
lists:map(fun({Name, _, _}) ->
Plugin = plugin(Name),
lists:map(fun({Name, _, [Type| _]}) ->
Plugin = plugin(Name, Type),
case lists:member(Name, StartedApps) of
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
plugin(AppName) ->
plugin(AppName, Type) ->
case application:get_all_key(AppName) of
{ok, Attrs} ->
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr};
#plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)};
undefined -> error({plugin_not_found, AppName})
end.
@ -171,12 +173,12 @@ plugin(AppName) ->
load(PluginName) when is_atom(PluginName) ->
case lists:member(PluginName, names(started_app)) of
true ->
?LOG(notice, "[Plugins] Plugin ~s is already started", [PluginName]),
?LOG(notice, "Plugin ~s is already started", [PluginName]),
{error, already_started};
false ->
case find_plugin(PluginName) of
false ->
?LOG(alert, "[Plugins] Plugin ~s not found", [PluginName]),
?LOG(alert, "Plugin ~s not found", [PluginName]),
{error, not_found};
Plugin ->
load_plugin(Plugin, true)
@ -204,12 +206,12 @@ load_app(App) ->
start_app(App, SuccFun) ->
case application:ensure_all_started(App) of
{ok, Started} ->
?LOG(info, "[Plugins] Started plugins: ~p", [Started]),
?LOG(info, "[Plugins] Load plugin ~s successfully", [App]),
?LOG(info, "Started plugins: ~p", [Started]),
?LOG(info, "Load plugin ~s successfully", [App]),
SuccFun(App),
{ok, Started};
{error, {ErrApp, Reason}} ->
?LOG(error, "[Plugins] Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]),
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]),
{error, {ErrApp, Reason}}
end.
@ -226,10 +228,10 @@ unload(PluginName) when is_atom(PluginName) ->
{true, true} ->
unload_plugin(PluginName, true);
{false, _} ->
?LOG(error, "[Plugins] Plugin ~s is not started", [PluginName]),
?LOG(error, "Plugin ~s is not started", [PluginName]),
{error, not_started};
{true, false} ->
?LOG(error, "[Plugins] ~s is not a plugin, cannot unload it", [PluginName]),
?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]),
{error, not_found}
end.
@ -244,11 +246,11 @@ unload_plugin(App, Persistent) ->
stop_app(App) ->
case application:stop(App) of
ok ->
?LOG(info, "[Plugins] Stop plugin ~s successfully", [App]), ok;
?LOG(info, "Stop plugin ~s successfully", [App]), ok;
{error, {not_started, App}} ->
?LOG(error, "[Plugins] Plugin ~s is not started", [App]), ok;
?LOG(error, "Plugin ~s is not started", [App]), ok;
{error, Reason} ->
?LOG(error, "[Plugins] Stop plugin ~s error: ~p", [App]), {error, Reason}
?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
end.
%%--------------------------------------------------------------------
@ -276,7 +278,7 @@ plugin_loaded(Name, true) ->
ignore
end;
{error, Error} ->
?LOG(error, "[Plugins] Cannot read loaded plugins: ~p", [Error])
?LOG(error, "Cannot read loaded plugins: ~p", [Error])
end.
plugin_unloaded(_Name, false) ->
@ -289,10 +291,10 @@ plugin_unloaded(Name, true) ->
true ->
write_loaded(lists:delete(Name, Names));
false ->
?LOG(error, "[Plugins] Cannot find ~s in loaded_file", [Name])
?LOG(error, "Cannot find ~s in loaded_file", [Name])
end;
{error, Error} ->
?LOG(error, "[Plugins] Cannot read loaded_plugins: ~p", [Error])
?LOG(error, "Cannot read loaded_plugins: ~p", [Error])
end.
read_loaded() ->
@ -311,6 +313,13 @@ write_loaded(AppNames) ->
file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name])))
end, AppNames);
{error, Error} ->
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
?LOG(error, "Open File ~p Error: ~p", [File, Error]),
{error, Error}
end.
plugin_type(auth) -> auth;
plugin_type(protocol) -> protocol;
plugin_type(backend) -> backend;
plugin_type(bridge) -> bridge;
plugin_type(_) -> feature.

View File

@ -19,6 +19,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Pool]").
%% APIs
-export([start_link/2]).
@ -97,22 +99,22 @@ handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State};
handle_call(Req, _From, State) ->
?LOG(error, "[Pool] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({async_submit, Task}, State) ->
try run(Task)
catch _:Error:Stacktrace ->
?LOG(error, "[Pool] Error: ~p, ~p", [Error, Stacktrace])
?LOG(error, "Error: ~p, ~p", [Error, Stacktrace])
end,
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "[Pool] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Pool] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -20,6 +20,8 @@
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-logger_header("[Protocol]").
-export([ info/1
, attrs/1
, attr/2
@ -413,11 +415,11 @@ process(?CONNECT_PACKET(
%% Success
{?RC_SUCCESS, SP, PState4};
{error, Error} ->
?LOG(error, "[Protocol] Failed to open session: ~p", [Error]),
?LOG(error, "Failed to open session: ~p", [Error]),
{?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
end;
{error, Reason} ->
?LOG(warning, "[Protocol] Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
?LOG(warning, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
{emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
end;
{error, ReasonCode} ->
@ -429,7 +431,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState = #
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "[Protocol] Cannot publish qos0 message to ~s for ~s",
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
do_acl_deny_action(AclDenyAction, Packet, ReasonCode, PState)
@ -440,7 +442,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState = #p
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "[Protocol] Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]),
?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({puback, PacketId, ReasonCode}, PState) of
{ok, PState1} ->
AclDenyAction = emqx_zone:get_env(Zone, acl_deny_action, ignore),
@ -454,7 +456,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState = #p
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "[Protocol] Cannot publish qos2 message to ~s for ~s",
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({pubrec, PacketId, ReasonCode}, PState) of
{ok, PState1} ->
@ -501,7 +503,7 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
({Topic, #{rc := Code}}, {Topics, Codes}) ->
{[Topic|Topics], [Code|Codes]}
end, {[], []}, TopicFilters),
?LOG(warning, "[Protocol] Cannot subscribe ~p for ~p",
?LOG(warning, "Cannot subscribe ~p for ~p",
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
case deliver({suback, PacketId, ReasonCodes}, PState) of
{ok, PState1} ->
@ -844,7 +846,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic},
case do_acl_check(EnableAcl, publish, Credentials, WillTopic) of
ok -> ok;
Other ->
?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]),
Other
end.
@ -898,9 +900,9 @@ check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
end, {ok, []}, TopicFilters).
trace(recv, Packet) ->
?LOG(debug, "[Protocol] RECV ~s", [emqx_packet:format(Packet)]);
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]);
trace(send, Packet) ->
?LOG(debug, "[Protocol] SEND ~s", [emqx_packet:format(Packet)]).
?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]).
inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) ->
PState#pstate{recv_stats = inc_stats(Type, Stats)};
@ -926,7 +928,7 @@ terminate(Reason, PState) when Reason =:= conflict;
terminate(Reason, PState = #pstate{credentials = Credentials}) ->
do_flapping_detect(disconnect, PState),
?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
?LOG(info, "Shutdown for ~p", [Reason]),
ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
start_keepalive(0, _PState) ->
@ -961,7 +963,7 @@ do_flapping_detect(Action, #pstate{zone = Zone,
Threshold = emqx_zone:get_env(Zone, flapping_threshold, {10, 60}),
case emqx_flapping:check(Action, ClientId, Threshold) of
flapping ->
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_ban_expiry_interval, 3600000),
BanExpiryInterval = emqx_zone:get_env(Zone, flapping_banned_expiry_interval, 3600000),
Until = erlang:system_time(second) + BanExpiryInterval,
emqx_banned:add(#banned{who = {client_id, ClientId},
reason = <<"flapping">>,

View File

@ -16,6 +16,8 @@
-include("logger.hrl").
-logger_header("[PSK]").
%% SSL PSK Callbacks
-export([lookup/3]).
@ -27,10 +29,10 @@ lookup(psk, ClientPSKID, _UserState) ->
try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of
SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret};
Error ->
?LOG(error, "[PSK] Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
error
catch
Except:Error:Stacktrace ->
?LOG(error, "[PSK] Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),
error
end.

View File

@ -21,6 +21,8 @@
-include("types.hrl").
-include_lib("ekka/include/ekka.hrl").
-logger_header("[Router]").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -198,15 +200,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Router] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Router] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Router] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Router Helper]").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -103,11 +105,11 @@ init([]) ->
{ok, #{nodes => Nodes}, hibernate}.
handle_call(Req, _From, State) ->
?LOG(error, "[Router Helper] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Router Helper] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
@ -123,7 +125,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
{noreply, State};
handle_info({mnesia_table_event, Event}, State) ->
?LOG(error, "[Router Helper] Unexpected mnesia_table_event: ~p", [Event]),
?LOG(error, "Unexpected mnesia_table_event: ~p", [Event]),
{noreply, State};
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
@ -141,7 +143,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
?LOG(error, "[Route Helper] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -47,6 +47,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Session]").
-export([start_link/1]).
-export([ info/1
@ -399,11 +401,11 @@ handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "[Session] Discarded by ~p", [ByPid]),
?LOG(warning, "Discarded by ~p", [ByPid]),
{stop, {shutdown, discarded}, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, "[Session] Conn ~p is discarded by ~p", [ConnPid, ByPid]),
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State};
@ -423,7 +425,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
{ok, ensure_stats_timer(ensure_await_rel_timer(State1))}
end;
true ->
?LOG(warning, "[Session] Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
ok = emqx_metrics:inc('messages.qos2.dropped'),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
@ -435,7 +437,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
true ->
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
false ->
?LOG(warning, "[Session] The PUBREC PacketId ~w is not found.", [PacketId]),
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -447,7 +449,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{_Ts, AwaitingRel1} ->
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
error ->
?LOG(warning, "[Session] The PUBREL PacketId ~w is not found", [PacketId]),
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.pubrel.missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -456,7 +458,7 @@ handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Session] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
%% SUBSCRIBE:
@ -497,7 +499,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
true ->
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
false ->
?LOG(warning, "[Session] The PUBACK PacketId ~w is not found", [PacketId]),
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'),
State
end);
@ -509,7 +511,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
true ->
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
false ->
?LOG(warning, "[Session] The PUBCOMP PacketId ~w is not found", [PacketId]),
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.pubcomp.missed'),
State
end);
@ -527,14 +529,14 @@ handle_cast({resume, #{conn_pid := ConnPid,
expiry_timer = ExpireTimer,
will_delay_timer = WillDelayTimer}) ->
?LOG(info, "[Session] Resumed by connection ~p ", [ConnPid]),
?LOG(info, "Resumed by connection ~p ", [ConnPid]),
%% Cancel Timers
lists:foreach(fun emqx_misc:cancel_timer/1,
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
case kick(ClientId, OldConnPid, ConnPid) of
ok -> ?LOG(warning, "[Session] Connection ~p kickout ~p", [ConnPid, OldConnPid]);
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]);
ignore -> ok
end,
@ -565,7 +567,7 @@ handle_cast({update_expiry_interval, Interval}, State) ->
{noreply, State#state{expiry_interval = Interval}};
handle_cast(Msg, State) ->
?LOG(error, "[Session] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
@ -600,12 +602,12 @@ handle_info({timeout, Timer, emit_stats},
GcState1 = emqx_gc:reset(GcState),
{noreply, NewState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} ->
?LOG(warning, "[Session] Shutdown exceptionally due to ~p", [Reason]),
?LOG(warning, "Shutdown exceptionally due to ~p", [Reason]),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "[Session] Expired, shutdown now.", []),
?LOG(info, "Expired, shutdown now.", []),
shutdown(expired, State);
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
@ -640,12 +642,12 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
{noreply, State#state{old_conn_pid = undefined}};
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "[Session] Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
[ConnPid, Pid, Reason]),
{noreply, State};
handle_info(Info, State) ->
?LOG(error, "[Session] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(Reason, #state{will_msg = WillMsg,
@ -771,7 +773,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
ok = emqx_metrics:inc('messages.qos2.expired'),
?LOG(warning, "[Session] Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
Age ->
ensure_await_rel_timer(Timeout - max(0, Age), State)
@ -981,7 +983,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "[Session] Duplicated PUBACK PacketId ~w", [PacketId]),
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
State
end;
@ -991,10 +993,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "[Session] Duplicated PUBREC PacketId ~w", [PacketId]),
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
State;
none ->
?LOG(warning, "[Session] Unexpected PUBREC PacketId ~w", [PacketId]),
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]),
State
end;

View File

@ -19,6 +19,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Session Supervisor]").
-export([start_link/1]).
-export([ start_session/1
@ -92,7 +94,7 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From,
reply({error, Reason}, State)
catch
_:Error:Stk ->
?LOG(error, "[Session Supervisor] Failed to start session ~p: ~p, stacktrace:~n~p",
?LOG(error, "Failed to start session ~p: ~p, stacktrace:~n~p",
[ClientId, Error, Stk]),
reply({error, Error}, State)
end;
@ -101,11 +103,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) ->
{reply, maps:size(SessMap), State};
handle_call(Req, _From, State) ->
?LOG(error, "[Session Supervisor] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Session Supervisor] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) ->
@ -117,7 +119,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow
{noreply, State#state{sessions = SessMap1}};
handle_info(Info, State) ->
?LOG(notice, "[Session Supervisor] Unexpected info: ~p", [Info]),
?LOG(notice, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State) ->

View File

@ -21,6 +21,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Shared Sub]").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -310,11 +312,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Shared Sub] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Shared Sub] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
@ -329,12 +331,12 @@ handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
?LOG(info, "[Shared Sub] Shared subscriber down: ~p", [SubPid]),
?LOG(info, "Shared subscriber down: ~p", [SubPid]),
cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(Info, State) ->
?LOG(error, "[Shared Sub] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[SM]").
%% APIs
-export([start_link/0]).
@ -114,7 +116,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
try emqx_session:discard(SessPid, ConnPid)
catch
_:Error:_Stk ->
?LOG(warning, "[SM] Failed to discard ~p: ~p", [SessPid, Error])
?LOG(warning, "Failed to discard ~p: ~p", [SessPid, Error])
end
end, lookup_session_pids(ClientId)).
@ -128,7 +130,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
{ok, SessPid};
SessPids ->
[SessPid|StalePids] = lists:reverse(SessPids),
?LOG(error, "[SM] More than one session found: ~p", [SessPids]),
?LOG(error, "More than one session found: ~p", [SessPids]),
lists:foreach(fun(StalePid) ->
catch emqx_session:discard(StalePid, ConnPid)
end, StalePids),
@ -254,15 +256,15 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
?LOG(error, "[SM] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[SM] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[SM] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Registry]").
-export([start_link/0]).
-export([ is_enabled/0
@ -96,11 +98,11 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
?LOG(error, "[Registry] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Registry] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
@ -114,7 +116,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
?LOG(error, "[Registry] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Stats]").
%% APIs
-export([ start_link/0
, start_link/1
@ -184,7 +186,7 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Stats] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
@ -202,7 +204,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) ->
case lists:keyfind(Name, #update.name, Updates) of
#update{} ->
?LOG(warning, "[Stats] Duplicated update: ~s", [Name]),
?LOG(warning, "Duplicated update: ~s", [Name]),
{noreply, State};
false ->
{noreply, State#state{updates = [Update | Updates]}}
@ -212,7 +214,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
{noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}};
handle_cast(Msg, State) ->
?LOG(error, "[Stats] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
@ -222,7 +224,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
try UpFun()
catch
_:Error ->
?LOG(error, "[Stats] update ~s failed: ~p", [Name, Error])
?LOG(error, "update ~s failed: ~p", [Name, Error])
end,
[Update#update{countdown = I} | Acc];
(Update = #update{countdown = C}, Acc) ->
@ -231,7 +233,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
handle_info(Info, State) ->
?LOG(error, "[Stats] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->
@ -252,5 +254,5 @@ safe_update_element(Key, Val) ->
true
catch
error:badarg ->
?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val])
?LOG(warning, "Update ~p to ~p failed", [Key, Val])
end.

View File

@ -19,6 +19,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[SYS]").
-export([start_link/0]).
-export([ version/0
@ -117,11 +119,11 @@ handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(Req, _From, State) ->
?LOG(error, "[SYS] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[SYS] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
@ -138,7 +140,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi
{noreply, tick(State), hibernate};
handle_info(Info, State) ->
?LOG(error, "[SYS] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->

View File

@ -19,6 +19,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[SYSMON]").
-export([start_link/1]).
%% compress unused warning
@ -88,18 +90,18 @@ parse_opt([_Opt|Opts], Acc) ->
parse_opt(Opts, Acc).
handle_call(Req, _From, State) ->
?LOG(error, "[SYSMON] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[SYSMON] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid},
fun() ->
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_gc, WarnMsg)
end, State);
@ -107,7 +109,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
suppress({long_schedule, Pid},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -115,7 +117,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]),
?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -123,7 +125,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid},
fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(warning, "[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(large_heap, WarnMsg)
end, State);
@ -131,7 +133,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_port, WarnMsg)
end, State);
@ -139,7 +141,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_dist_port, WarnMsg)
end, State);
@ -147,7 +149,7 @@ handle_info({timeout, _Ref, reset}, State) ->
{noreply, State#{events := []}, hibernate};
handle_info(Info, State) ->
?LOG(error, "[SYSMON] Unexpected Info: ~p", [Info]),
?LOG(error, "Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->

View File

@ -19,6 +19,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Tracer]").
%% APIs
-export([start_link/0]).
@ -121,10 +123,10 @@ handle_call({start_trace, Who, Level, LogFile}, _From, State = #state{traces = T
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who} }]}) of
ok ->
?LOG(info, "[Tracer] Start trace for ~p", [Who]),
?LOG(info, "Start trace for ~p", [Who]),
{reply, ok, State#state{traces = maps:put(Who, {Level, LogFile}, Traces)}};
{error, Reason} ->
?LOG(error, "[Tracer] Start trace for ~p failed, error: ~p", [Who, Reason]),
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
{reply, {error, Reason}, State}
end;
@ -133,9 +135,9 @@ handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
{ok, _LogFile} ->
case logger:remove_handler(handler_id(Who)) of
ok ->
?LOG(info, "[Tracer] Stop trace for ~p", [Who]);
?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "[Tracer] Stop trace for ~p failed, error: ~p", [Who, Reason])
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason])
end,
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
error ->
@ -146,15 +148,15 @@ handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
handle_call(Req, _From, State) ->
?LOG(error, "[Tracer] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Tracer] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Tracer] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -20,6 +20,8 @@
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-logger_header("[WS Channel]").
-export([ info/1
, attrs/1
, stats/1
@ -143,11 +145,11 @@ websocket_init(#state{request = Req, options = Options}) ->
WsCookie = try cowboy_req:parse_cookies(Req)
catch
error:badarg ->
?LOG(error, "[WS Channel] Illegal cookie"),
?LOG(error, "Illegal cookie"),
undefined;
Error:Reason ->
?LOG(error,
"[WS Channel] Cookie is parsed failed, Error: ~p, Reason ~p",
"Cookie is parsed failed, Error: ~p, Reason ~p",
[Error, Reason]),
undefined
end,
@ -189,7 +191,7 @@ websocket_handle({binary, <<>>}, State) ->
websocket_handle({binary, [<<>>]}, State) ->
{ok, ensure_stats_timer(State)};
websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) ->
?LOG(debug, "[WS Channel] RECV ~p", [Data]),
?LOG(debug, "RECV ~p", [Data]),
BinSize = iolist_size(Data),
emqx_pd:update_counter(recv_oct, BinSize),
ok = emqx_metrics:inc('bytes.received', BinSize),
@ -204,11 +206,11 @@ websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) ->
end,
State#state{parse_state = NParseState});
{error, Reason} ->
?LOG(error, "[WS Channel] Frame error: ~p", [Reason]),
?LOG(error, "Frame error: ~p", [Reason]),
shutdown(Reason, State)
catch
error:Reason:Stk ->
?LOG(error, "[WS Channel] Parse failed for ~p~n\
?LOG(error, "Parse failed for ~p~n\
Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]),
shutdown(parse_error, State)
end;
@ -219,7 +221,11 @@ websocket_handle(Frame, State)
{ok, ensure_stats_timer(State)};
websocket_handle({FrameType, _}, State)
when FrameType =:= ping; FrameType =:= pong ->
{ok, ensure_stats_timer(State)}.
{ok, ensure_stats_timer(State)};
%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285]
websocket_handle({_OtherFrameType, _}, State) ->
?LOG(error, "Frame error: Other type of data frame"),
shutdown(other_frame_type, State).
websocket_info({call, From, info}, State) ->
gen_server:reply(From, info(State)),
@ -255,12 +261,12 @@ websocket_info({timeout, Timer, emit_stats},
{ok, State#state{stats_timer = undefined}, hibernate};
websocket_info({keepalive, start, Interval}, State) ->
?LOG(debug, "[WS Channel] Keepalive at the interval of ~p", [Interval]),
?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
{ok, KeepAlive} ->
{ok, State#state{keepalive = KeepAlive}};
{error, Error} ->
?LOG(warning, "[WS Channel] Keepalive error: ~p", [Error]),
?LOG(warning, "Keepalive error: ~p", [Error]),
shutdown(Error, State)
end;
@ -269,19 +275,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
{ok, KeepAlive1} ->
{ok, State#state{keepalive = KeepAlive1}};
{error, timeout} ->
?LOG(debug, "[WS Channel] Keepalive Timeout!"),
?LOG(debug, "Keepalive Timeout!"),
shutdown(keepalive_timeout, State);
{error, Error} ->
?LOG(error, "[WS Channel] Keepalive error: ~p", [Error]),
?LOG(error, "Keepalive error: ~p", [Error]),
shutdown(keepalive_error, State)
end;
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "[WS Channel] Discarded by ~s:~p", [ClientId, ByPid]),
?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
shutdown(discard, State);
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "[WS Channel] Clientid '~s' conflict with ~p", [ClientId, NewPid]),
?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State);
websocket_info({binary, Data}, State) ->
@ -294,13 +300,13 @@ websocket_info({stop, Reason}, State) ->
{stop, State#state{shutdown = Reason}};
websocket_info(Info, State) ->
?LOG(error, "[WS Channel] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "[WS Channel] Terminated for ~p, sockerror: ~p",
?LOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError]),
emqx_keepalive:cancel(Keepalive),
case {ProtoState, Shutdown} of
@ -320,7 +326,7 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
{ok, NProtoState} ->
SuccFun(State#state{proto_state = NProtoState});
{error, Reason} ->
?LOG(error, "[WS Channel] Protocol error: ~p", [Reason]),
?LOG(error, "Protocol error: ~p", [Reason]),
shutdown(Reason, State);
{error, Reason, NProtoState} ->
shutdown(Reason, State#state{proto_state = NProtoState});

View File

@ -20,6 +20,8 @@
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Zone]").
%% APIs
-export([start_link/0]).
@ -96,7 +98,7 @@ handle_call(force_reload, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[Zone] Unexpected call: ~p", [Req]),
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({set_env, Zone, Key, Val}, State) ->
@ -104,11 +106,11 @@ handle_cast({set_env, Zone, Key, Val}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "[Zone] Unexpected cast: ~p", [Msg]),
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[Zone] Unexpected info: ~p", [Info]),
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -36,8 +36,6 @@ all() ->
groups() ->
[{access_control, [sequence],
[reload_acl,
register_mod,
unregister_mod,
check_acl_1,
check_acl_2]},
{access_control_cache_mode, [],
@ -98,58 +96,26 @@ write_config(Filename, Terms) ->
end_per_group(_Group, Config) ->
Config.
init_per_testcase(_TestCase, Config) ->
?AC:start_link(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok.
per_testcase_config(acl_cache_full, Config) ->
Config;
per_testcase_config(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% emqx_access_control
%%--------------------------------------------------------------------
reload_acl(_) ->
[ok] = ?AC:reload_acl().
register_mod(_) ->
ok = ?AC:register_mod(acl, emqx_acl_test_mod, []),
{emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)),
ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]),
ok = ?AC:register_mod(auth, emqx_auth_dashboard, [], 99),
[{emqx_auth_dashboard, _, 99},
{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth).
unregister_mod(_) ->
ok = ?AC:register_mod(acl, emqx_acl_test_mod, []),
{emqx_acl_test_mod, _, 0} = hd(?AC:lookup_mods(acl)),
ok = ?AC:unregister_mod(acl, emqx_acl_test_mod),
timer:sleep(5),
{emqx_acl_internal, _, 0}= hd(?AC:lookup_mods(acl)),
ok = ?AC:register_mod(auth, emqx_auth_anonymous_test_mod,[]),
[{emqx_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth),
ok = ?AC:unregister_mod(auth, emqx_auth_anonymous_test_mod),
timer:sleep(5),
[] = ?AC:lookup_mods(auth).
ok = ?AC:reload_acl().
check_acl_1(_) ->
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
deny = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(SelfUser, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
check_acl_2(_) ->
SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>},
SelfUser = #{client_id => <<"client2">>, username => <<"xyz">>, zone => external},
deny = ?AC:check_acl(SelfUser, subscribe, <<"a/b/c">>).
acl_cache_basic(_) ->
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
@ -162,7 +128,7 @@ acl_cache_basic(_) ->
acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
@ -172,7 +138,7 @@ acl_cache_expiry(_) ->
acl_cache_full(_) ->
application:set_env(emqx, acl_cache_max_size, 1),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -187,7 +153,7 @@ acl_cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2),
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>},
SelfUser = #{client_id => <<"client1">>, username => <<"testuser">>, zone => external},
allow = ?AC:check_acl(SelfUser, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(SelfUser, subscribe, <<"clients/client1">>),
@ -357,8 +323,8 @@ compile_rule(_) ->
{deny, all} = compile({deny, all}).
match_rule(_) ->
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}},
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}, zone => external},
User2 = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{192,168,0,10}, 3028}, zone => external},
{matched, allow} = match(User, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(User, <<"Test/Topic">>, {deny, all}),

View File

@ -62,8 +62,8 @@ t_alarm_handler(_) ->
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
Topic1 = emqx_topic:systop(<<"alarms/alarm_for_test/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/alarm_for_test/clear">>),
Topic1 = emqx_topic:systop(<<"alarms/alert">>),
Topic2 = emqx_topic:systop(<<"alarms/clear">>),
SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0},
emqx_client_sock:send(Sock,
raw_send_serialize(

View File

@ -68,11 +68,11 @@ groups() -> [{connect, [sequence],
]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
init_per_group(_Group, Config) ->
Config.
@ -85,7 +85,7 @@ case1_protocol_name(_) ->
MqttPacket = serialize(?CASE1_PROTOCOL_NAME),
emqx_client_sock:send(Sock, MqttPacket),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data),
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data),
Disconnect = gen_tcp:recv(Sock, 0),
?assertEqual({error, closed}, Disconnect).
@ -95,7 +95,7 @@ case2_protocol_ver(_) ->
emqx_client_sock:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0),
%% case1 Unacceptable protocol version
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data),
{ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), <<>>, _} = raw_recv_pase(Data),
Disconnect = gen_tcp:recv(Sock, 0),
?assertEqual({error, closed}, Disconnect).

View File

@ -55,7 +55,7 @@ groups() ->
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
MqttCaps = emqx_zone:get_env(external, '$mqtt_caps'),
MqttCaps = maps:from_list(emqx_mqtt_caps:default_caps()),
emqx_zone:set_env(external, '$mqtt_caps', MqttCaps#{max_topic_alias => 20}),
Config.

View File

@ -22,10 +22,11 @@
-include_lib("common_test/include/ct.hrl").
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]).
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
emqx_ct_helpers:stop_apps([]).
all() ->
[request_response].

View File

@ -31,6 +31,7 @@
all() ->
[ t_ws_connect_api
, t_ws_auth_failure
, t_ws_other_type_frame
].
init_per_suite(Config) ->
@ -71,6 +72,18 @@ t_ws_connect_api(_Config) ->
{close, _} = rfc6455_client:close(WS),
ok.
t_ws_other_type_frame(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS),
ok = rfc6455_client:send_binary(WS, raw_send_serialize(?CLIENT)),
{binary, Bin} = rfc6455_client:recv(WS),
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
rfc6455_client:send(WS, <<"testdata">>),
timer:sleep(1000),
?assertEqual(undefined, erlang:process_info(WS)),
ok.
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet).

13
test/run_emqx.escript Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env escript
main(_) ->
start().
start() ->
SpecEmqxConfig = fun(_) -> ok end,
start(SpecEmqxConfig).
start(SpecEmqxConfig) ->
SchemaPath = filename:join(["priv", "emqx.schema"]),
ConfPath = filename:join(["etc", "emqx.conf"]),
emqx_ct_helpers:start_app(emqx, SchemaPath, ConfPath, SpecEmqxConfig).