Merge pull request #5673 from JimMoen/stu-dev
Code refactor during studying.
This commit is contained in:
commit
b5d1ffa814
|
@ -64,7 +64,7 @@ listeners.tcp.default {
|
||||||
proxy_protocol = false
|
proxy_protocol = false
|
||||||
|
|
||||||
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
||||||
## if no proxy protocol packet recevied within the timeout.
|
## if no proxy protocol packet received within the timeout.
|
||||||
##
|
##
|
||||||
## @doc listeners.tcp.<name>.proxy_protocol_timeout
|
## @doc listeners.tcp.<name>.proxy_protocol_timeout
|
||||||
## ValueType: Duration
|
## ValueType: Duration
|
||||||
|
@ -163,7 +163,7 @@ listeners.ssl.default {
|
||||||
proxy_protocol = false
|
proxy_protocol = false
|
||||||
|
|
||||||
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
||||||
## if no proxy protocol packet recevied within the timeout.
|
## if no proxy protocol packet received within the timeout.
|
||||||
##
|
##
|
||||||
## @doc listeners.ssl.<name>.proxy_protocol_timeout
|
## @doc listeners.ssl.<name>.proxy_protocol_timeout
|
||||||
## ValueType: Duration
|
## ValueType: Duration
|
||||||
|
@ -345,7 +345,7 @@ listeners.ws.default {
|
||||||
proxy_protocol = false
|
proxy_protocol = false
|
||||||
|
|
||||||
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
||||||
## if no proxy protocol packet recevied within the timeout.
|
## if no proxy protocol packet received within the timeout.
|
||||||
##
|
##
|
||||||
## @doc listeners.ws.<name>.proxy_protocol_timeout
|
## @doc listeners.ws.<name>.proxy_protocol_timeout
|
||||||
## ValueType: Duration
|
## ValueType: Duration
|
||||||
|
@ -448,7 +448,7 @@ listeners.wss.default {
|
||||||
proxy_protocol = false
|
proxy_protocol = false
|
||||||
|
|
||||||
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
|
||||||
## if no proxy protocol packet recevied within the timeout.
|
## if no proxy protocol packet received within the timeout.
|
||||||
##
|
##
|
||||||
## @doc listeners.wss.<name>.proxy_protocol_timeout
|
## @doc listeners.wss.<name>.proxy_protocol_timeout
|
||||||
## ValueType: Duration
|
## ValueType: Duration
|
||||||
|
|
|
@ -22,49 +22,38 @@
|
||||||
|
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Supervisor callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Banned = #{id => banned,
|
|
||||||
start => {emqx_banned, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 1000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_banned]},
|
|
||||||
Flapping = #{id => flapping,
|
|
||||||
start => {emqx_flapping, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 1000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_flapping]},
|
|
||||||
%% Channel locker
|
|
||||||
Locker = #{id => locker,
|
|
||||||
start => {emqx_cm_locker, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 5000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_cm_locker]
|
|
||||||
},
|
|
||||||
%% Channel registry
|
|
||||||
Registry = #{id => registry,
|
|
||||||
start => {emqx_cm_registry, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 5000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_cm_registry]
|
|
||||||
},
|
|
||||||
%% Channel Manager
|
|
||||||
Manager = #{id => manager,
|
|
||||||
start => {emqx_cm, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 5000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_cm]
|
|
||||||
},
|
|
||||||
SupFlags = #{strategy => one_for_one,
|
SupFlags = #{strategy => one_for_one,
|
||||||
intensity => 100,
|
intensity => 100,
|
||||||
period => 10
|
period => 10
|
||||||
},
|
},
|
||||||
|
Banned = child_spec(emqx_banned, 1000, worker),
|
||||||
|
Flapping = child_spec(emqx_flapping, 1000, worker),
|
||||||
|
Locker = child_spec(emqx_cm_locker, 5000, worker),
|
||||||
|
Registry = child_spec(emqx_cm_registry, 5000, worker),
|
||||||
|
Manager = child_spec(emqx_cm, 5000, worker),
|
||||||
{ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}.
|
{ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
child_spec(Mod, Shutdown, Type) ->
|
||||||
|
#{id => Mod,
|
||||||
|
start => {Mod, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => Shutdown,
|
||||||
|
type => Type,
|
||||||
|
modules => [Mod]
|
||||||
|
}.
|
||||||
|
|
|
@ -100,14 +100,10 @@ parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
||||||
Header = #mqtt_packet_header{type = Type,
|
Header = #mqtt_packet_header{type = Type,
|
||||||
dup = bool(Dup),
|
dup = bool(Dup),
|
||||||
qos = QoS,
|
qos = fixqos(Type, QoS),
|
||||||
retain = bool(Retain)
|
retain = bool(Retain)
|
||||||
},
|
},
|
||||||
Header1 = case fixqos(Type, QoS) of
|
parse_remaining_len(Rest, Header, Options);
|
||||||
QoS -> Header;
|
|
||||||
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
|
|
||||||
end,
|
|
||||||
parse_remaining_len(Rest, Header1, Options);
|
|
||||||
|
|
||||||
parse(Bin, {{len, #{hdr := Header,
|
parse(Bin, {{len, #{hdr := Header,
|
||||||
len := {Multiplier, Length}}
|
len := {Multiplier, Length}}
|
||||||
|
|
|
@ -114,8 +114,8 @@ t_cm(_) ->
|
||||||
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000).
|
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000).
|
||||||
|
|
||||||
t_cm_registry(_) ->
|
t_cm_registry(_) ->
|
||||||
Info = supervisor:which_children(emqx_cm_sup),
|
Children = supervisor:which_children(emqx_cm_sup),
|
||||||
{_, Pid, _, _} = lists:keyfind(registry, 1, Info),
|
{_, Pid, _, _} = lists:keyfind(emqx_cm_registry, 1, Children),
|
||||||
ignored = gen_server:call(Pid, <<"Unexpected call">>),
|
ignored = gen_server:call(Pid, <<"Unexpected call">>),
|
||||||
gen_server:cast(Pid, <<"Unexpected cast">>),
|
gen_server:cast(Pid, <<"Unexpected cast">>),
|
||||||
Pid ! <<"Unexpected info">>.
|
Pid ! <<"Unexpected info">>.
|
||||||
|
|
|
@ -55,8 +55,8 @@ t_detect_check(_) ->
|
||||||
true = emqx_banned:check(ClientInfo),
|
true = emqx_banned:check(ClientInfo),
|
||||||
timer:sleep(3000),
|
timer:sleep(3000),
|
||||||
false = emqx_banned:check(ClientInfo),
|
false = emqx_banned:check(ClientInfo),
|
||||||
Childrens = supervisor:which_children(emqx_cm_sup),
|
Children = supervisor:which_children(emqx_cm_sup),
|
||||||
{flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens),
|
{emqx_flapping, Pid, _, _} = lists:keyfind(emqx_flapping, 1, Children),
|
||||||
gen_server:call(Pid, unexpected_msg),
|
gen_server:call(Pid, unexpected_msg),
|
||||||
gen_server:cast(Pid, unexpected_msg),
|
gen_server:cast(Pid, unexpected_msg),
|
||||||
Pid ! test,
|
Pid ! test,
|
||||||
|
|
|
@ -835,7 +835,7 @@ inc_incoming_stats(Ctx, FrameMod, Packet) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
Name = list_to_atom(
|
Name = list_to_atom(
|
||||||
lists:concat(["packets.", FrameMod:type(Packet), ".recevied"])),
|
lists:concat(["packets.", FrameMod:type(Packet), ".received"])),
|
||||||
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
emqx_gateway_ctx:metrics_inc(Ctx, Name).
|
||||||
|
|
||||||
inc_outgoing_stats(Ctx, FrameMod, Packet) ->
|
inc_outgoing_stats(Ctx, FrameMod, Packet) ->
|
||||||
|
|
|
@ -994,7 +994,7 @@ t_will_case06(_) ->
|
||||||
|
|
||||||
receive
|
receive
|
||||||
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
|
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
|
||||||
Msg -> ct:print("recevived --- unex: ~p", [Msg])
|
Msg -> ct:print("received --- unex: ~p", [Msg])
|
||||||
after
|
after
|
||||||
1000 -> ct:fail(wait_willmsg_timeout)
|
1000 -> ct:fail(wait_willmsg_timeout)
|
||||||
end,
|
end,
|
||||||
|
|
Loading…
Reference in New Issue