Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-07-20 01:01:52 +08:00
commit 6ec7cb5090
7 changed files with 73 additions and 25 deletions

View File

@ -145,7 +145,7 @@ cluster.autoclean = 5m
## The address type is used to extract host from k8s service.
##
## Value: ip | dns
## Value: ip | dns | hostname
## cluster.k8s.address_type = ip
## The app name helps build 'node.name'.
@ -153,6 +153,11 @@ cluster.autoclean = 5m
## Value: String
## cluster.k8s.app_name = emqx
## The suffix added to dns and hostname get from k8s service
##
## Value: String
## cluster.k8s.suffix = pod.cluster.local
## Kubernates Namespace
##
## Value: String
@ -1878,6 +1883,14 @@ plugins.expand_plugins_dir = {{ platform_plugins_dir }}/
## Default: 1m, 1 minute
broker.sys_interval = 1m
## System heartbeat interval of publishing following heart beat message:
## - "$SYS/brokers/<node>/uptime"
## - "$SYS/brokers/<node>/datetime"
##
## Value: Duration
## Default: 30s
broker.sys_heartbeat = 30s
## Enable global session registry.
##
## Value: on | off

View File

@ -129,7 +129,7 @@
]}.
{mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [
{datatype, {enum, [ip, dns]}}
{datatype, {enum, [ip, dns, hostname]}}
]}.
{mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [
@ -140,6 +140,11 @@
{datatype, string}
]}.
{mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [
{datatype, string},
{default, ""}
]}.
{translation, "ekka.cluster_discovery", fun(Conf) ->
Strategy = cuttlefish:conf_get("cluster.discovery", Conf),
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
@ -176,7 +181,8 @@
{service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)},
{address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)},
{app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)},
{namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}];
{namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)},
{suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}];
(manual) ->
[ ]
end,
@ -1807,6 +1813,11 @@ end}.
{default, "1m"}
]}.
{mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [
{datatype, {duration, ms}},
{default, "30s"}
]}.
{mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [
{default, on},
{datatype, flag}

View File

@ -4,7 +4,7 @@
{gproc, "0.8.0"}, % hex
{replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -23,18 +23,28 @@
-define(RPC, gen_rpc).
call(Node, Mod, Fun, Args) ->
filter_result(?RPC:call(Node, Mod, Fun, Args)).
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
multicall(Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)).
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
cast(Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(Node, Mod, Fun, Args)).
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
rpc_node(Node) ->
{Node, erlang:system_info(scheduler_id)}.
rpc_nodes(Nodes) ->
rpc_nodes(Nodes, []).
rpc_nodes([], Acc) ->
Acc;
rpc_nodes([Node | Nodes], Acc) ->
rpc_nodes(Nodes, [rpc_node(Node) | Acc]).
filter_result({Error, Reason})
when Error =:= badrpc; Error =:= badtcp ->
{badrpc, Reason};
filter_result(Delivery) ->
case Delivery of
{badrpc, Reason} -> {badrpc, Reason};
{badtcp, Reason} -> {badrpc, Reason};
_ -> Delivery
end.
Delivery.

View File

@ -626,7 +626,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry
_ ->
send_willmsg(WillMsg)
end,
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
shutdown(Reason, State#state{will_msg = undefined, conn_pid = undefined});
handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
State1 = case Reason of

View File

@ -28,6 +28,7 @@
, datetime/0
, sysdescr/0
, sys_interval/0
, sys_heatbeat_interval/0
]).
-export([info/0]).
@ -92,6 +93,11 @@ datetime() ->
sys_interval() ->
application:get_env(?APP, broker_sys_interval, 60000).
%% @doc Get sys heatbeat interval
-spec(sys_heatbeat_interval() -> pos_integer()).
sys_heatbeat_interval() ->
application:get_env(?APP, sys_heartbeat, 30000).
%% @doc Get sys info
-spec(info() -> list(tuple())).
info() ->
@ -111,7 +117,7 @@ init([]) ->
{ok, heartbeat(tick(State))}.
heartbeat(State) ->
State#state{heartbeat = start_timer(timer:seconds(1), heartbeat)}.
State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}.
tick(State) ->
State#state{ticker = start_timer(sys_interval(), tick)}.

View File

@ -303,26 +303,34 @@ websocket_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError]),
terminate(WsReason, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown = Shutdown}) ->
?LOG(debug, "Terminated for ~p, websocket reason: ~p",
[Shutdown, WsReason]),
emqx_keepalive:cancel(Keepalive),
case {ProtoState, Shutdown} of
{undefined, _} -> ok;
{_, {shutdown, Reason}} ->
emqx_protocol:terminate(Reason, ProtoState),
exit(Reason);
{_, Error} ->
emqx_protocol:terminate(Error, ProtoState),
exit({error, SockError})
terminate_session(Reason, ProtoState);
{_, _Error} ->
?LOG(info, "Terminate for unexpected error: ~p", [WsReason]),
terminate_session(unknown, ProtoState)
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
terminate_session(Reason, ProtoState) ->
emqx_protocol:terminate(Reason, ProtoState),
case emqx_protocol:session(ProtoState) of
undefined ->
ok;
SessionPid ->
SessionPid ! {'EXIT', self(), Reason}
end.
handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
case emqx_protocol:received(Packet, ProtoState) of
{ok, NProtoState} ->