Replace lager with emqx_log

This commit is contained in:
Feng Lee 2018-04-09 12:34:53 +08:00
parent 39ff658e58
commit 71acf91ace
80 changed files with 297 additions and 243 deletions

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (C) 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
%% Start/Stop Application %% Start/Stop Application
-export([start/0, env/1, env/2, is_running/1, stop/0]). -export([start/0, is_running/1, stop/0]).
%% PubSub API %% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3, publish/1, -export([subscribe/1, subscribe/2, subscribe/3, publish/1,
@ -43,7 +43,7 @@
-define(APP, ?MODULE). -define(APP, ?MODULE).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Bootstrap, environment, configuration, is_running... %% Bootstrap, is_running...
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Start emqx application %% @doc Start emqx application
@ -54,14 +54,6 @@ start() -> application:start(?APP).
-spec(stop() -> ok | {error, term()}). -spec(stop() -> ok | {error, term()}).
stop() -> application:stop(?APP). stop() -> application:stop(?APP).
%% @doc Get environment
-spec(env(Key :: atom()) -> {ok, any()} | undefined).
env(Key) -> application:get_env(?APP, Key).
%% @doc Get environment with default
-spec(env(Key :: atom(), Default :: any()) -> undefined | any()).
env(Key, Default) -> application:get_env(?APP, Key, Default).
%% @doc Is emqx running? %% @doc Is emqx running?
-spec(is_running(node()) -> boolean()). -spec(is_running(node()) -> boolean()).
is_running(Node) -> is_running(Node) ->
@ -71,7 +63,6 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true Pid when is_pid(Pid) -> true
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% PubSub API %% PubSub API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -51,7 +51,7 @@ start_link() ->
auth(Client, Password) when is_record(Client, client) -> auth(Client, Password) when is_record(Client, client) ->
auth(Client, Password, lookup_mods(auth)). auth(Client, Password, lookup_mods(auth)).
auth(_Client, _Password, []) -> auth(_Client, _Password, []) ->
case emqx:env(allow_anonymous, false) of case emqx_conf:get_env(allow_anonymous, false) of
true -> ok; true -> ok;
false -> {error, "No auth module to check!"} false -> {error, "No auth module to check!"}
end; end;
@ -68,12 +68,12 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
-spec(check_acl(Client, PubSub, Topic) -> allow | deny when -spec(check_acl(Client, PubSub, Topic) -> allow | deny when
Client :: client(), Client :: client(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary()). Topic :: topic()).
check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
check_acl(Client, PubSub, Topic, lookup_mods(acl)). check_acl(Client, PubSub, Topic, lookup_mods(acl)).
check_acl(_Client, _PubSub, _Topic, []) -> check_acl(_Client, _PubSub, _Topic, []) ->
emqx:env(acl_nomatch, allow); emqx_conf:get_env(acl_nomatch, allow);
check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Client, PubSub, Topic}, State) of case Mod:check_acl({Client, PubSub, Topic}, State) of
allow -> allow; allow -> allow;
@ -88,15 +88,17 @@ reload_acl() ->
%% @doc Register Authentication or ACL module. %% @doc Register Authentication or ACL module.
-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}). -spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}).
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl-> register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl ->
register_mod(Type, Mod, Opts, 0). register_mod(Type, Mod, Opts, 0).
-spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, term()}). -spec(register_mod(auth | acl, atom(), list(), non_neg_integer())
-> ok | {error, term()}).
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl-> register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}). gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
%% @doc Unregister authentication or ACL module %% @doc Unregister authentication or ACL module
-spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, not_found | term()}). -spec(unregister_mod(Type :: auth | acl, Mod :: atom())
-> ok | {error, not_found | term()}).
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}). gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
@ -116,11 +118,11 @@ stop() ->
gen_server:call(?MODULE, stop). gen_server:call(?MODULE, stop).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
_ = ets:new(?TAB, [set, named_table, protected, {read_concurrency, true}]), _ = emqx_tables:create(?TAB, [set, protected, {read_concurrency, true}]),
{ok, #state{}}. {ok, #state{}}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
@ -155,13 +157,15 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]), emqx_log:error("[AccessControl] Unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, ignore, State}.
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
emqx_log:error("[AccessControl] Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(_Info, State) -> handle_info(Info, State) ->
emqx_log:error("[AccessControl] Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -48,19 +48,18 @@ all_rules() ->
%% @doc Init internal ACL %% @doc Init internal ACL
-spec(init([File :: string()]) -> {ok, State :: any()}). -spec(init([File :: string()]) -> {ok, State :: any()}).
init([File]) -> init([File]) ->
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), _ = emqx_tables:create(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
State = #state{config = File}, {ok, load_rules_from_file(#state{config = File})}.
true = load_rules_from_file(State),
{ok, State}.
load_rules_from_file(#state{config = AclFile}) -> load_rules_from_file(State = #state{config = AclFile}) ->
{ok, Terms} = file:consult(AclFile), {ok, Terms} = file:consult(AclFile),
Rules = [emqx_access_rule:compile(Term) || Term <- Terms], Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) -> lists:foreach(fun(PubSub) ->
ets:insert(?ACL_RULE_TAB, {PubSub, ets:insert(?ACL_RULE_TAB, {PubSub,
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
end, [publish, subscribe]), end, [publish, subscribe]),
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}). ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
State.
filter(_PubSub, {allow, all}) -> filter(_PubSub, {allow, all}) ->
true; true;
@ -79,7 +78,7 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
-spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when -spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
Client :: client(), Client :: client(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary(), Topic :: topic(),
State :: #state{}). State :: #state{}).
check_acl(_Who, #state{config = undefined}) -> check_acl(_Who, #state{config = undefined}) ->
allow; allow;

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -92,7 +92,7 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId,
{summary, iolist_to_binary(Summary)}, {summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(TS)}]) of {ts, emqx_time:now_secs(TS)}]) of
{'EXIT', Reason} -> {'EXIT', Reason} ->
lager:error("Failed to encode set_alarm: ~p", [Reason]); emqx_log:error("[Alarm] Failed to encode set_alarm: ~p", [Reason]);
JSON -> JSON ->
emqx_broker:publish(alarm_msg(alert, AlarmId, JSON)) emqx_broker:publish(alarm_msg(alert, AlarmId, JSON))
end, end,
@ -101,7 +101,7 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId,
handle_event({clear_alarm, AlarmId}, Alarms) -> handle_event({clear_alarm, AlarmId}, Alarms) ->
case catch emqx_json:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of case catch emqx_json:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of
{'EXIT', Reason} -> {'EXIT', Reason} ->
lager:error("Failed to encode clear_alarm: ~p", [Reason]); emqx_log:error("[Alarm] Failed to encode clear_alarm: ~p", [Reason]);
JSON -> JSON ->
emqx_broker:publish(alarm_msg(clear, AlarmId, JSON)) emqx_broker:publish(alarm_msg(clear, AlarmId, JSON))
end, end,

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -63,7 +63,7 @@ print_vsn() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
register_acl_mod() -> register_acl_mod() ->
case emqx:env(acl_file) of case emqx_conf:get_env(acl_file) of
{ok, File} -> emqx_access_control:register_mod(acl, emqx_acl_internal, [File]); {ok, File} -> emqx_access_control:register_mod(acl, emqx_acl_internal, [File]);
undefined -> ok undefined -> ok
end. end.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -61,12 +61,14 @@ passwd_hash(sha256, Password) ->
passwd_hash(pbkdf2, {Salt, Password, Macfun, Iterations, Dklen}) -> passwd_hash(pbkdf2, {Salt, Password, Macfun, Iterations, Dklen}) ->
case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of
{ok, Hexstring} -> pbkdf2:to_hex(Hexstring); {ok, Hexstring} -> pbkdf2:to_hex(Hexstring);
{error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), <<>> {error, Error} ->
emqx_log:error("[AuthMod] PasswdHash with pbkdf2 error:~p", [Error]), <<>>
end; end;
passwd_hash(bcrypt, {Salt, Password}) -> passwd_hash(bcrypt, {Salt, Password}) ->
case bcrypt:hashpw(Password, Salt) of case bcrypt:hashpw(Password, Salt) of
{ok, HashPassword} -> list_to_binary(HashPassword); {ok, HashPassword} -> list_to_binary(HashPassword);
{error, Error}-> lager:error("PasswdHash with bcrypt error:~p", [Error]), <<>> {error, Error}->
emqx_log:error("[AuthMod] PasswdHash with bcrypt error:~p", [Error]), <<>>
end. end.
hexstring(<<X:128/big-unsigned-integer>>) -> hexstring(<<X:128/big-unsigned-integer>>) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -64,8 +64,8 @@ start_link(Pool, Id, Node, Topic, Options) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Pool, Id, Node, Topic, Options]) -> init([Pool, Id, Node, Topic, Options]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}),
process_flag(trap_exit, true), process_flag(trap_exit, true),
gproc_pool:connect_worker(Pool, {Pool, Id}),
case net_kernel:connect_node(Node) of case net_kernel:connect_node(Node) of
true -> true ->
true = erlang:monitor_node(Node, true), true = erlang:monitor_node(Node, true),
@ -103,11 +103,11 @@ qname(Node, Topic) ->
iolist_to_binary(["Bridge:", Node, ":", Topic]). iolist_to_binary(["Bridge:", Node, ":", Topic]).
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), emqx_log:error("[Bridge] Unexpected request: ~p", [Req]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), emqx_log:error("[Bridge] Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) -> handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) ->
@ -118,7 +118,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
lager:warning("Bridge Node Down: ~p", [Node]), emqx_log:warning("[Bridge] Node Down: ~s", [Node]),
erlang:send_after(Interval, self(), ping_down_node), erlang:send_after(Interval, self(), ping_down_node),
{noreply, State#state{status = down}, hibernate}; {noreply, State#state{status = down}, hibernate};
@ -126,7 +126,7 @@ handle_info({nodeup, Node}, State = #state{node = Node}) ->
%% TODO: Really fast?? %% TODO: Really fast??
case emqx:is_running(Node) of case emqx:is_running(Node) of
true -> true ->
lager:warning("Bridge Node Up: ~p", [Node]), emqx_log:warning("[Bridge] Node up: ~s", [Node]),
{noreply, dequeue(State#state{status = up})}; {noreply, dequeue(State#state{status = up})};
false -> false ->
self() ! {nodedown, Node}, self() ! {nodedown, Node},
@ -149,7 +149,7 @@ handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), emqx_log:error("[Bridge] Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, #state{pool = Pool, id = Id}) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -18,12 +18,10 @@
-export([start_link/3]). -export([start_link/3]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Start bridge pool supervisor %% @doc Start bridge pool supervisor
-spec(start_link(atom(), binary(), [emqx_bridge:option()]) -> {ok, pid()} | {error, term()}). -spec(start_link(atom(), binary(), [emqx_bridge:option()])
-> {ok, pid()} | {error, term()}).
start_link(Node, Topic, Options) -> start_link(Node, Topic, Options) ->
MFA = {emqx_bridge, start_link, [Node, Topic, Options]}, MFA = {emqx_bridge, start_link, [Node, Topic, Options]},
emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -46,7 +46,7 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
start_bridge(Node, _Topic, _Options) when Node =:= node() -> start_bridge(Node, _Topic, _Options) when Node =:= node() ->
{error, bridge_to_self}; {error, bridge_to_self};
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
{ok, BridgeEnv} = emqx:env(bridge), {ok, BridgeEnv} = emqx_conf:get_env(bridge),
Options1 = emqx_misc:merge_opts(BridgeEnv, Options), Options1 = emqx_misc:merge_opts(BridgeEnv, Options),
supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -40,7 +40,7 @@ start_link(StatsFun) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([StatsFun]) -> init([StatsFun]) ->
{ok, TRef} = timer:send_interval(1000, stats), {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_fun = StatsFun, stats_timer = TRef}}. {ok, #state{stats_fun = StatsFun, stats_timer = TRef}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -33,7 +33,7 @@ start_link() ->
init([]) -> init([]) ->
%% Create the pubsub tables %% Create the pubsub tables
create_tabs(), lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]),
%% Shared subscription %% Shared subscription
Shared = {shared_sub, {emqx_shared_sub, start_link, []}, Shared = {shared_sub, {emqx_shared_sub, start_link, []},
@ -55,9 +55,6 @@ init([]) ->
%% Create tables %% Create tables
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_tabs() ->
lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]).
create_tab(suboption) -> create_tab(suboption) ->
%% Suboption: {Topic, Sub} -> [{qos, 1}] %% Suboption: {Topic, Sub} -> [{qos, 1}]
ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]); ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]);

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -62,9 +62,13 @@ unreg(ClientId) ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([StatsFun]) -> init([]) ->
{ok, Ref} = timer:send_interval(timer:seconds(1), stats), _ = emqx_tables:create(client, [public, set, {keypos, 2},
{ok, #state{stats_fun = StatsFun, stats_timer = Ref, monitors = dict:new()}}. {read_concurrency, true},
{write_concurrency, true}]),
_ = emqx_tables:create(client_attrs, [public, set,
{write_concurrency, true}]),
{ok, #state{monitors = dict:new()}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_log:error("[CM] Unexpected request: ~p", [Req]), emqx_log:error("[CM] Unexpected request: ~p", [Req]),
@ -102,7 +106,7 @@ handle_info(stats, State) ->
{noreply, setstats(State), hibernate}; {noreply, setstats(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("[CM] Unexpected Info: ~p", [Info]), emqx_log:error("[CM] Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef}) -> terminate(_Reason, _State = #state{stats_timer = TRef}) ->

72
src/emqx_cm_stats.erl Normal file
View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cm_stats).
-behaviour(gen_statem).
-include("emqx.hrl").
%% API
-export([start_link/0]).
-export([set_client_stats/2, get_client_stats/1, del_client_stats/1]).
%% gen_statem callbacks
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-define(TAB, client_stats).
-record(state, {statsfun}).
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(set_client_stats(client_id(), emqx_stats:stats()) -> true).
set_client_stats(ClientId, Stats) ->
ets:insert(?TAB, {ClientId, [{'$ts', emqx_time:now_secs()}|Stats]}).
-spec(get_client_stats(client_id()) -> emqx_stats:stats()).
get_client_stats(ClientId) ->
case ets:lookup(?TAB, ClientId) of
[{_, Stats}] -> Stats;
[] -> []
end.
-spec(del_client_stats(client_id()) -> true).
del_client_stats(ClientId) ->
ets:delete(?TAB, ClientId).
init([]) ->
_ = emqx_tables:create(?TAB, [public, {write_concurrency, true}]),
StatsFun = emqx_stats:statsfun('clients/count', 'clients/max'),
{ok, idle, #state{statsfun = StatsFun}, timer:seconds(1)}.
callback_mode() -> handle_event_function.
handle_event(timeout, _Timeout, idle, State = #state{statsfun = StatsFun}) ->
case ets:info(client, size) of
undefined -> ok;
Size -> StatsFun(Size)
end,
{next_state, idle, State, timer:seconds(1)}.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -26,16 +26,9 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
%% Create table Stats = {emqx_cm_stats, {emqx_cm_stats, start_link, []},
lists:foreach(fun create_tab/1, [client, client_stats, client_attrs]), permanent, 5000, worker, [emqx_cm_stats]},
CM = {emqx_cm, {emqx_cm, start_link, []},
StatsFun = emqx_stats:statsfun('clients/count', 'clients/max'),
CM = {emqx_cm, {emqx_cm, start_link, [StatsFun]},
permanent, 5000, worker, [emqx_cm]}, permanent, 5000, worker, [emqx_cm]},
{ok, {{one_for_all, 10, 3600}, [Stats, CM]}}.
{ok, {{one_for_all, 10, 3600}, [CM]}}.
create_tab(Tab) ->
emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -25,12 +25,20 @@
-module(emqx_config). -module(emqx_config).
-export([get_env/1, get_env/2]).
-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]).
-type(env() :: {atom(), term()}). -type(env() :: {atom(), term()}).
-define(APP, emqx). -define(APP, emqx).
-spec(get_env(Key :: atom(), Default :: any()) -> undefined | any()).
get_env(Key, Default) ->
application:get_env(?APP, Key, Default).
%% @doc Get environment
-spec(get_env(Key :: atom()) -> {ok, any()} | undefined).
get_env(Key) -> get_env(Key) ->
application:get_env(?APP, Key). application:get_env(?APP, Key).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -57,7 +57,7 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
lager:Level("Client(~s): " ++ Format, emqx_log:Level("Client(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])). [esockd_net:format(State#state.peername) | Args])).
start_link(Conn, Env) -> start_link(Conn, Env) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -108,7 +108,7 @@ handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
[] -> [] ->
ets:insert(?TAB, {{Seq, Cmd}, MF, Opts}); ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] -> [[OriginSeq] | _] ->
lager:warning("CLI: ~s is overidden by ~p", [Cmd, MF]), emqx_log:warning("[CLI] ~s is overidden by ~p", [Cmd, MF]),
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts}) ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
end, end,
noreply(next_seq(State)); noreply(next_seq(State));

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (C) 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -35,11 +35,11 @@ load(Rules0) ->
emqx:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). emqx:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]).
rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) -> rewrite_subscribe(_ClientId, _Username, TopicTable, Rules) ->
lager:info("Rewrite subscribe: ~p", [TopicTable]), emqx_log:info("Rewrite subscribe: ~p", [TopicTable]),
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) -> rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) ->
lager:info("Rewrite unsubscribe: ~p", [TopicTable]), emqx_log:info("Rewrite unsubscribe: ~p", [TopicTable]),
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_publish(Message = #message{topic = Topic}, Rules) -> rewrite_publish(Message = #message{topic = Topic}, Rules) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -23,11 +23,11 @@ load() ->
fun({Mod, Env}) -> fun({Mod, Env}) ->
ok = Mod:load(Env), ok = Mod:load(Env),
io:format("Load ~s module successfully.~n", [Mod]) io:format("Load ~s module successfully.~n", [Mod])
end, emqx:env(modules, [])). end, emqx_conf:get_env(modules, [])).
unload() -> unload() ->
lists:foreach( lists:foreach(
fun({Mod, Env}) -> fun({Mod, Env}) ->
Mod:unload(Env) end, Mod:unload(Env) end,
emqx:env(modules, [])). emqx_conf:get_env(modules, [])).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -40,7 +40,7 @@ shutdown() ->
%% @doc Start Listeners. %% @doc Start Listeners.
-spec(start_listeners() -> ok). -spec(start_listeners() -> ok).
start_listeners() -> start_listeners() ->
lists:foreach(fun start_listener/1, emqx:env(listeners, [])). lists:foreach(fun start_listener/1, emqx_conf:get_env(listeners, [])).
%% Start mqtt listener %% Start mqtt listener
-spec(start_listener(listener()) -> {ok, pid()} | {error, any()}). -spec(start_listener(listener()) -> {ok, pid()} | {error, any()}).
@ -60,7 +60,7 @@ start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
{ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}).
start_listener(Proto, ListenOn, Opts) -> start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), Env = lists:append(emqx_conf:get_env(client, []), emqx_conf:get_env(protocol, [])),
MFArgs = {emqx_connection, start_link, [Env]}, MFArgs = {emqx_connection, start_link, [Env]},
{ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
@ -75,7 +75,8 @@ is_mqtt(_Proto) -> false.
%% @doc Stop Listeners %% @doc Stop Listeners
-spec(stop_listeners() -> ok). -spec(stop_listeners() -> ok).
stop_listeners() -> lists:foreach(fun stop_listener/1, emqx:env(listeners, [])). stop_listeners() ->
lists:foreach(fun stop_listener/1, emqx_conf:get_env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, any()}). -spec(stop_listener(listener()) -> ok | {error, any()}).
stop_listener({tcp, ListenOn, _Opts}) -> stop_listener({tcp, ListenOn, _Opts}) ->
@ -93,7 +94,9 @@ stop_listener({Proto, ListenOn, _Opts}) ->
%% @doc Restart Listeners %% @doc Restart Listeners
-spec(restart_listeners() -> ok). -spec(restart_listeners() -> ok).
restart_listeners() -> lists:foreach(fun restart_listener/1, emqx:env(listeners, [])). restart_listeners() ->
lists:foreach(fun restart_listener/1,
emqx_conf:get_env(listeners, [])).
-spec(restart_listener(listener()) -> any()). -spec(restart_listener(listener()) -> any()).
restart_listener({tcp, ListenOn, _Opts}) -> restart_listener({tcp, ListenOn, _Opts}) ->
@ -113,3 +116,4 @@ merge_sockopts(Options) ->
SockOpts = emqx_misc:merge_opts( SockOpts = emqx_misc:merge_opts(
?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])), ?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])),
emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]). emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -31,7 +31,7 @@
%% @doc Init plugins' config %% @doc Init plugins' config
-spec(init() -> ok). -spec(init() -> ok).
init() -> init() ->
case emqx:env(plugins_etc_dir) of case emqx_conf:get_env(plugins_etc_dir) of
{ok, PluginsEtc} -> {ok, PluginsEtc} ->
CfgFiles = [filename:join(PluginsEtc, File) || CfgFiles = [filename:join(PluginsEtc, File) ||
File <- filelib:wildcard("*.config", PluginsEtc)], File <- filelib:wildcard("*.config", PluginsEtc)],
@ -50,7 +50,7 @@ init_config(CfgFile) ->
-spec(load() -> list() | {error, term()}). -spec(load() -> list() | {error, term()}).
load() -> load() ->
load_expand_plugins(), load_expand_plugins(),
case emqx:env(plugins_loaded_file) of case emqx_conf:get_env(plugins_loaded_file) of
{ok, File} -> {ok, File} ->
ensure_file(File), ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);
@ -60,7 +60,7 @@ load() ->
end. end.
load_expand_plugins() -> load_expand_plugins() ->
case emqx:env(expand_plugins_dir) of case emqx_conf:get_env(expand_plugins_dir) of
{ok, Dir} -> {ok, Dir} ->
PluginsDir = filelib:wildcard("*", Dir), PluginsDir = filelib:wildcard("*", Dir),
lists:foreach(fun(PluginDir) -> lists:foreach(fun(PluginDir) ->
@ -83,7 +83,8 @@ load_expand_plugin(PluginDir) ->
end, Modules), end, Modules),
case filelib:wildcard(Ebin ++ "/*.app") of case filelib:wildcard(Ebin ++ "/*.app") of
[App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); [App|_] -> application:load(list_to_atom(filename:basename(App, ".app")));
_ -> lager:error("load application fail"), {error, load_app_fail} _ -> emqx_log:error("App file cannot be found."),
{error, load_app_fail}
end. end.
init_expand_plugin_config(PluginDir) -> init_expand_plugin_config(PluginDir) ->
@ -100,7 +101,7 @@ init_expand_plugin_config(PluginDir) ->
end, AppsEnv). end, AppsEnv).
get_expand_plugin_config() -> get_expand_plugin_config() ->
case emqx:env(expand_plugins_dir) of case emqx_conf:get_env(expand_plugins_dir) of
{ok, Dir} -> {ok, Dir} ->
PluginsDir = filelib:wildcard("*", Dir), PluginsDir = filelib:wildcard("*", Dir),
lists:foldl(fun(PluginDir, Acc) -> lists:foldl(fun(PluginDir, Acc) ->
@ -127,7 +128,7 @@ with_loaded_file(File, SuccFun) ->
{ok, Names} -> {ok, Names} ->
SuccFun(Names); SuccFun(Names);
{error, Error} -> {error, Error} ->
lager:error("Failed to read: ~p, error: ~p", [File, Error]), emqx_log:error("[Plugins] Failed to read: ~p, error: ~p", [File, Error]),
{error, Error} {error, Error}
end. end.
@ -135,7 +136,7 @@ load_plugins(Names, Persistent) ->
Plugins = list(), NotFound = Names -- names(Plugins), Plugins = list(), NotFound = Names -- names(Plugins),
case NotFound of case NotFound of
[] -> ok; [] -> ok;
NotFound -> lager:error("Cannot find plugins: ~p", [NotFound]) NotFound -> emqx_log:error("[Plugins] Cannot find plugins: ~p", [NotFound])
end, end,
NeedToLoad = Names -- NotFound -- names(started_app), NeedToLoad = Names -- NotFound -- names(started_app),
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
@ -143,7 +144,7 @@ load_plugins(Names, Persistent) ->
%% @doc Unload all plugins before broker stopped. %% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, term()}). -spec(unload() -> list() | {error, term()}).
unload() -> unload() ->
case emqx:env(plugins_loaded_file) of case emqx_conf:get_env(plugins_loaded_file) of
{ok, File} -> {ok, File} ->
with_loaded_file(File, fun stop_plugins/1); with_loaded_file(File, fun stop_plugins/1);
undefined -> undefined ->
@ -157,7 +158,7 @@ stop_plugins(Names) ->
%% @doc List all available plugins %% @doc List all available plugins
-spec(list() -> [plugin()]). -spec(list() -> [plugin()]).
list() -> list() ->
case emqx:env(plugins_etc_dir) of case emqx_conf:get_env(plugins_etc_dir) of
{ok, PluginsEtc} -> {ok, PluginsEtc} ->
CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(),
Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
@ -184,12 +185,12 @@ plugin(CfgFile) ->
load(PluginName) when is_atom(PluginName) -> load(PluginName) when is_atom(PluginName) ->
case lists:member(PluginName, names(started_app)) of case lists:member(PluginName, names(started_app)) of
true -> true ->
lager:error("Plugin ~p is already started", [PluginName]), emqx_log:error("[Plugins] Plugin ~s is already started", [PluginName]),
{error, already_started}; {error, already_started};
false -> false ->
case find_plugin(PluginName) of case find_plugin(PluginName) of
false -> false ->
lager:error("Plugin ~s not found", [PluginName]), emqx_log:error("[Plugins] Plugin ~s not found", [PluginName]),
{error, not_found}; {error, not_found};
Plugin -> Plugin ->
load_plugin(Plugin, true) load_plugin(Plugin, true)
@ -217,12 +218,12 @@ load_app(App) ->
start_app(App, SuccFun) -> start_app(App, SuccFun) ->
case application:ensure_all_started(App) of case application:ensure_all_started(App) of
{ok, Started} -> {ok, Started} ->
lager:info("Started Apps: ~p", [Started]), emqx_log:info("Started Apps: ~p", [Started]),
lager:info("Load plugin ~p successfully", [App]), emqx_log:info("Load plugin ~s successfully", [App]),
SuccFun(App), SuccFun(App),
{ok, Started}; {ok, Started};
{error, {ErrApp, Reason}} -> {error, {ErrApp, Reason}} ->
lager:error("load plugin ~p error, cannot start app ~s for ~p", [App, ErrApp, Reason]), emqx_log:error("Load plugin ~s error, cannot start app ~s for ~p", [App, ErrApp, Reason]),
{error, {ErrApp, Reason}} {error, {ErrApp, Reason}}
end. end.
@ -239,10 +240,10 @@ unload(PluginName) when is_atom(PluginName) ->
{true, true} -> {true, true} ->
unload_plugin(PluginName, true); unload_plugin(PluginName, true);
{false, _} -> {false, _} ->
lager:error("Plugin ~p is not started", [PluginName]), emqx_log:error("Plugin ~s is not started", [PluginName]),
{error, not_started}; {error, not_started};
{true, false} -> {true, false} ->
lager:error("~s is not a plugin, cannot unload it", [PluginName]), emqx_log:error("~s is not a plugin, cannot unload it", [PluginName]),
{error, not_found} {error, not_found}
end. end.
@ -257,11 +258,11 @@ unload_plugin(App, Persistent) ->
stop_app(App) -> stop_app(App) ->
case application:stop(App) of case application:stop(App) of
ok -> ok ->
lager:info("Stop plugin ~p successfully~n", [App]), ok; emqx_log:info("Stop plugin ~s successfully", [App]), ok;
{error, {not_started, App}} -> {error, {not_started, App}} ->
lager:error("Plugin ~p is not started~n", [App]), ok; emqx_log:error("Plugin ~s is not started", [App]), ok;
{error, Reason} -> {error, Reason} ->
lager:error("Stop plugin ~p error: ~p", [App]), {error, Reason} emqx_log:error("Stop plugin ~s error: ~p", [App]), {error, Reason}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -293,7 +294,7 @@ plugin_loaded(Name, true) ->
ignore ignore
end; end;
{error, Error} -> {error, Error} ->
lager:error("Cannot read loaded plugins: ~p", [Error]) emqx_log:error("Cannot read loaded plugins: ~p", [Error])
end. end.
plugin_unloaded(_Name, false) -> plugin_unloaded(_Name, false) ->
@ -305,14 +306,14 @@ plugin_unloaded(Name, true) ->
true -> true ->
write_loaded(lists:delete(Name, Names)); write_loaded(lists:delete(Name, Names));
false -> false ->
lager:error("Cannot find ~s in loaded_file", [Name]) emqx_log:error("Cannot find ~s in loaded_file", [Name])
end; end;
{error, Error} -> {error, Error} ->
lager:error("Cannot read loaded_plugins: ~p", [Error]) emqx_log:error("Cannot read loaded_plugins: ~p", [Error])
end. end.
read_loaded() -> read_loaded() ->
case emqx:env(plugins_loaded_file) of case emqx_conf:get_env(plugins_loaded_file) of
{ok, File} -> read_loaded(File); {ok, File} -> read_loaded(File);
undefined -> {error, not_found} undefined -> {error, not_found}
end. end.
@ -320,14 +321,14 @@ read_loaded() ->
read_loaded(File) -> file:consult(File). read_loaded(File) -> file:consult(File).
write_loaded(AppNames) -> write_loaded(AppNames) ->
{ok, File} = emqx:env(plugins_loaded_file), {ok, File} = emqx_conf:get_env(plugins_loaded_file),
case file:open(File, [binary, write]) of case file:open(File, [binary, write]) of
{ok, Fd} -> {ok, Fd} ->
lists:foreach(fun(Name) -> lists:foreach(fun(Name) ->
file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name]))) file:write(Fd, iolist_to_binary(io_lib:format("~s.~n", [Name])))
end, AppNames); end, AppNames);
{error, Error} -> {error, Error} ->
lager:error("Open File ~p Error: ~p", [File, Error]), emqx_log:error("Open File ~p Error: ~p", [File, Error]),
{error, Error} {error, Error}
end. end.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -69,7 +69,10 @@ handle_call(_Req, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({async_submit, Fun}, State) -> handle_cast({async_submit, Fun}, State) ->
try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()]) end, try run(Fun)
catch _:Error ->
emqx_log:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()])
end,
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -57,7 +57,7 @@
-define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format, emqx_log:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
[State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])). [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
%% @doc Init protocol %% @doc Init protocol
@ -549,7 +549,7 @@ authenticate(Client, Password) ->
%% PUBLISH ACL is cached in process dictionary. %% PUBLISH ACL is cached in process dictionary.
check_acl(publish, Topic, Client) -> check_acl(publish, Topic, Client) ->
IfCache = emqx:env(cache_acl, true), IfCache = emqx_conf:get_env(cache_acl, true),
case {IfCache, get({acl, publish, Topic})} of case {IfCache, get({acl, publish, Topic})} of
{true, undefined} -> {true, undefined} ->
AllowDeny = emqx_access_control:check_acl(Client, publish, Topic), AllowDeny = emqx_access_control:check_acl(Client, publish, Topic),

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -155,7 +155,7 @@
created_at]). created_at]).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
lager:Level([{client, State#state.client_id}], emqx_log:Level([{client, State#state.client_id}],
"Session(~s): " ++ Format, [State#state.client_id | Args])). "Session(~s): " ++ Format, [State#state.client_id | Args])).
%% @doc Start a Session %% @doc Start a Session
@ -271,8 +271,8 @@ init(#{clean_start := CleanStart,
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = link(ClientPid), true = link(ClientPid),
init_stats([deliver_msg, enqueue_msg]), init_stats([deliver_msg, enqueue_msg]),
{ok, Env} = emqx:env(session), {ok, Env} = emqx_conf:get_env(session),
{ok, QEnv} = emqx:env(mqueue), {ok, QEnv} = emqx_conf:get_env(mqueue),
MaxInflight = get_value(max_inflight, Env, 0), MaxInflight = get_value(max_inflight, Env, 0),
EnableStats = get_value(enable_stats, Env, false), EnableStats = get_value(enable_stats, Env, false),
IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
@ -342,7 +342,7 @@ handle_call(state, _From, State) ->
reply(?record_to_proplist(state, State, ?STATE_KEYS), State); reply(?record_to_proplist(state, State, ?STATE_KEYS), State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]), emqx_log:error("[Session] Unexpected request: ~p", [Req]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast({subscribe, From, TopicTable, AckFun}, handle_cast({subscribe, From, TopicTable, AckFun},
@ -501,7 +501,7 @@ handle_cast({resume, ClientPid},
{noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; {noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]), emqx_log:error("[Session] Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
%% Ignore Messages delivered by self %% Ignore Messages delivered by self
@ -551,7 +551,7 @@ handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]), emqx_log:error("[Session] Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(Reason, #state{client_id = ClientId, username = Username}) -> terminate(Reason, #state{client_id = ClientId, username = Username}) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -25,9 +25,6 @@
%% Get all Stats %% Get all Stats
-export([all/0]). -export([all/0]).
%% Client and Session Stats
-export([set_client_stats/2, get_client_stats/1, del_client_stats/1]).
%% Statistics API. %% Statistics API.
-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]). -export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]).
@ -41,9 +38,7 @@
-export_type([stats/0]). -export_type([stats/0]).
-define(STATS_TAB, mqtt_stats). -define(STATS_TAB, stats).
-define(CLIENT_STATS_TAB, mqtt_client_stats).
-define(SESSION_STATS_TAB, mqtt_session_stats).
%% $SYS Topics for Clients %% $SYS Topics for Clients
-define(SYSTOP_CLIENTS, [ -define(SYSTOP_CLIENTS, [
@ -87,22 +82,6 @@ start_link() ->
stop() -> stop() ->
gen_server:call(?MODULE, stop). gen_server:call(?MODULE, stop).
-spec(set_client_stats(binary(), stats()) -> true).
set_client_stats(ClientId, Stats) ->
ets:insert(?CLIENT_STATS_TAB, {ClientId, [{'$ts', emqx_time:now_secs()}|Stats]}).
-spec(get_client_stats(binary()) -> stats()).
get_client_stats(ClientId) ->
case ets:lookup(?CLIENT_STATS_TAB, ClientId) of
[{_, Stats}] -> Stats;
[] -> []
end.
-spec(del_client_stats(binary()) -> true).
del_client_stats(ClientId) ->
ets:delete(?CLIENT_STATS_TAB, ClientId).
all() -> ets:tab2list(?STATS_TAB). all() -> ets:tab2list(?STATS_TAB).
%% @doc Generate stats fun %% @doc Generate stats fun
@ -143,10 +122,8 @@ setstat(Stat, MaxStat, Val) ->
init([]) -> init([]) ->
emqx_time:seed(), emqx_time:seed(),
lists:foreach( _ = emqx_tables:create(?STATS_TAB, [set, public, named_table,
fun(Tab) -> {write_concurrency, true}]),
Tab = ets:new(Tab, [set, public, named_table, {write_concurrency, true}])
end, [?STATS_TAB, ?CLIENT_STATS_TAB, ?SESSION_STATS_TAB]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
% Tick to publish stats % Tick to publish stats

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -28,10 +28,10 @@
%%-define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]). %%-define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]).
-define(LOG(Msg, ProcInfo), -define(LOG(Msg, ProcInfo),
lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). emqx_log:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
-define(LOG(Msg, ProcInfo, PortInfo), -define(LOG(Msg, ProcInfo, PortInfo),
lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). emqx_log:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
%% @doc Start system monitor %% @doc Start system monitor
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}). -spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
@ -75,11 +75,11 @@ parse_opt([_Opt|Opts], Acc) ->
parse_opt(Opts, Acc). parse_opt(Opts, Acc).
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("[SYSMON] Unexpected Call: ~p", [Req]), emqx_log:error("[SYSMON] Unexpected request: ~p", [Req]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("[SYSMON] Unexpected Cast: ~p", [Msg]), emqx_log:error("[SYSMON] Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) -> handle_info({monitor, Pid, long_gc, Info}, State) ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -28,7 +28,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
{ok, Env} = emqx:env(sysmon), {ok, Env} = emqx_conf:get_env(sysmon),
Sysmon = {sysmon, {emqx_sysmon, start_link, [Env]}, Sysmon = {sysmon, {emqx_sysmon, start_link, [Env]},
permanent, 5000, worker, [emqx_sysmon]}, permanent, 5000, worker, [emqx_sysmon]},
{ok, {{one_for_one, 10, 100}, [Sysmon]}}. {ok, {{one_for_one, 10, 100}, [Sysmon]}}.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -53,11 +53,11 @@ trace(publish, From, _Msg) when is_atom(From) ->
ignore; ignore;
trace(publish, #client{client_id = ClientId, username = Username}, trace(publish, #client{client_id = ClientId, username = Username},
#message{topic = Topic, payload = Payload}) -> #message{topic = Topic, payload = Payload}) ->
lager:info([{client, ClientId}, {topic, Topic}], emqx_log:info([{client, ClientId}, {topic, Topic}],
"~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
trace(publish, From, #message{topic = Topic, payload = Payload}) trace(publish, From, #message{topic = Topic, payload = Payload})
when is_binary(From); is_list(From) -> when is_binary(From); is_list(From) ->
lager:info([{client, From}, {topic, Topic}], emqx_log:info([{client, From}, {topic, Topic}],
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]). "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -83,14 +83,15 @@ stop_trace({topic, Topic}) ->
%% @doc Lookup all traces. %% @doc Lookup all traces.
-spec(all_traces() -> [{Who :: trace_who(), LogFile :: string()}]). -spec(all_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
all_traces() -> gen_server:call(?MODULE, all_traces). all_traces() ->
gen_server:call(?MODULE, all_traces).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
{ok, #state{level = emqx:env(trace_level, debug), traces = #{}}}. {ok, #state{level = emqx_conf:get_env(trace_level, debug), traces = #{}}}.
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
case lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of case lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -26,7 +26,7 @@
-record(wsocket_state, {peername, client_pid, max_packet_size, parser}). -record(wsocket_state, {peername, client_pid, max_packet_size, parser}).
-define(WSLOG(Level, Format, Args, State), -define(WSLOG(Level, Format, Args, State),
lager:Level("WsClient(~s): " ++ Format, emqx_log:Level("WsClient(~s): " ++ Format,
[esockd_net:format(State#wsocket_state.peername) | Args])). [esockd_net:format(State#wsocket_state.peername) | Args])).
@ -38,14 +38,14 @@ handle_request(Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) -> handle_request('GET', "/mqtt", Req) ->
lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]), emqx_log:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"), Upgrade = Req:get_header_value("Upgrade"),
Proto = check_protocol_header(Req), Proto = check_protocol_header(Req),
case {is_websocket(Upgrade), Proto} of case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} -> {true, "mqtt" ++ _Vsn} ->
case Req:get(peername) of case Req:get(peername) of
{ok, Peername} -> {ok, Peername} ->
{ok, ProtoEnv} = emqx:env(protocol), {ok, ProtoEnv} = emqx_conf:get_env(protocol),
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
Parser = emqx_parser:initial_state(PacketSize), Parser = emqx_parser:initial_state(PacketSize),
%% Upgrade WebSocket. %% Upgrade WebSocket.
@ -56,27 +56,27 @@ handle_request('GET', "/mqtt", Req) ->
max_packet_size = PacketSize, max_packet_size = PacketSize,
client_pid = ClientPid}); client_pid = ClientPid});
{error, Reason} -> {error, Reason} ->
lager:error("Get peername with error ~s", [Reason]), emqx_log:error("Get peername with error ~s", [Reason]),
Req:respond({400, [], <<"Bad Request">>}) Req:respond({400, [], <<"Bad Request">>})
end; end;
{false, _} -> {false, _} ->
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), emqx_log:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
Req:respond({400, [], <<"Bad Request">>}); Req:respond({400, [], <<"Bad Request">>});
{_, Proto} -> {_, Proto} ->
lager:error("WebSocket with error Protocol: ~s", [Proto]), emqx_log:error("WebSocket with error Protocol: ~s", [Proto]),
Req:respond({400, [], <<"Bad WebSocket Protocol">>}) Req:respond({400, [], <<"Bad WebSocket Protocol">>})
end; end;
handle_request(Method, Path, Req) -> handle_request(Method, Path, Req) ->
lager:error("Unexpected WS Request: ~s ~s", [Method, Path]), emqx_log:error("Unexpected WS Request: ~s ~s", [Method, Path]),
Req:not_found(). Req:not_found().
is_websocket(Upgrade) -> is_websocket(Upgrade) ->
(not emqx:env(websocket_check_upgrade_header, true)) orelse (not emqx_conf:get_env(websocket_check_upgrade_header, true)) orelse
(Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket"). (Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket").
check_protocol_header(Req) -> check_protocol_header(Req) ->
case emqx:env(websocket_protocol_header, false) of case emqx_conf:get_env(websocket_protocol_header, false) of
true -> get_protocol_header(Req); true -> get_protocol_header(Req);
false -> "mqtt-v3.1.1" false -> "mqtt-v3.1.1"
end. end.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -50,7 +50,7 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(WSLOG(Level, Format, Args, State), -define(WSLOG(Level, Format, Args, State),
lager:Level("WsClient(~s): " ++ Format, emqx_log:Level("WsClient(~s): " ++ Format,
[esockd_net:format(State#wsclient_state.peername) | Args])). [esockd_net:format(State#wsclient_state.peername) | Args])).
%% @doc Start WebSocket Client. %% @doc Start WebSocket Client.

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved. %% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -37,7 +37,7 @@ start_connection(WsPid, Req, ReplyChannel) ->
init([]) -> init([]) ->
%%TODO: Cannot upgrade the environments, Use zone? %%TODO: Cannot upgrade the environments, Use zone?
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), Env = lists:append(emqx_conf:get_env(client, []), emqx_conf:get_env(protocol, [])),
{ok, {{simple_one_for_one, 0, 1}, {ok, {{simple_one_for_one, 0, 1},
[{ws_connection, {emqx_ws_connection, start_link, [Env]}, [{ws_connection, {emqx_ws_connection, start_link, [Env]},
temporary, 5000, worker, [emqx_ws_connection]}]}}. temporary, 5000, worker, [emqx_ws_connection]}]}}.