diff --git a/etc/emqx.conf b/etc/emqx.conf index 9ce950aae..df3be2a9a 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -145,7 +145,7 @@ cluster.autoclean = 5m ## The address type is used to extract host from k8s service. ## -## Value: ip | dns +## Value: ip | dns | hostname ## cluster.k8s.address_type = ip ## The app name helps build 'node.name'. @@ -153,6 +153,11 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.app_name = emqx +## The suffix added to dns and hostname get from k8s service +## +## Value: String +## cluster.k8s.suffix = pod.cluster.local + ## Kubernates Namespace ## ## Value: String @@ -1878,6 +1883,14 @@ plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ ## Default: 1m, 1 minute broker.sys_interval = 1m +## System heartbeat interval of publishing following heart beat message: +## - "$SYS/brokers//uptime" +## - "$SYS/brokers//datetime" +## +## Value: Duration +## Default: 30s +broker.sys_heartbeat = 30s + ## Enable global session registry. ## ## Value: on | off diff --git a/priv/emqx.schema b/priv/emqx.schema index 1366665ca..1d308476d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -129,7 +129,7 @@ ]}. {mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ - {datatype, {enum, [ip, dns]}} + {datatype, {enum, [ip, dns, hostname]}} ]}. {mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ @@ -140,6 +140,11 @@ {datatype, string} ]}. +{mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [ + {datatype, string}, + {default, ""} + ]}. + {translation, "ekka.cluster_discovery", fun(Conf) -> Strategy = cuttlefish:conf_get("cluster.discovery", Conf), Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -176,7 +181,8 @@ {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}, - {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}]; + {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}, + {suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}]; (manual) -> [ ] end, @@ -1807,6 +1813,11 @@ end}. {default, "1m"} ]}. +{mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [ + {datatype, {duration, ms}}, + {default, "30s"} +]}. + {mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [ {default, on}, {datatype, flag} diff --git a/rebar.config b/rebar.config index 89b9436af..db7590567 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,7 @@ {gproc, "0.8.0"}, % hex {replayq, "0.1.1"}, %hex {esockd, "5.5.0"}, %hex - {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index 4d01b6229..96adf6605 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -23,18 +23,28 @@ -define(RPC, gen_rpc). call(Node, Mod, Fun, Args) -> - filter_result(?RPC:call(Node, Mod, Fun, Args)). + filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). multicall(Nodes, Mod, Fun, Args) -> - filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)). + filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). cast(Node, Mod, Fun, Args) -> - filter_result(?RPC:cast(Node, Mod, Fun, Args)). + filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). +rpc_node(Node) -> + {Node, erlang:system_info(scheduler_id)}. + +rpc_nodes(Nodes) -> + rpc_nodes(Nodes, []). + +rpc_nodes([], Acc) -> + Acc; +rpc_nodes([Node | Nodes], Acc) -> + rpc_nodes(Nodes, [rpc_node(Node) | Acc]). + + +filter_result({Error, Reason}) + when Error =:= badrpc; Error =:= badtcp -> + {badrpc, Reason}; filter_result(Delivery) -> - case Delivery of - {badrpc, Reason} -> {badrpc, Reason}; - {badtcp, Reason} -> {badrpc, Reason}; - _ -> Delivery - end. - + Delivery. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 392ce77f5..20028c9ef 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -626,7 +626,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry _ -> send_willmsg(WillMsg) end, - {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; + shutdown(Reason, State#state{will_msg = undefined, conn_pid = undefined}); handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> State1 = case Reason of diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 37e22856f..51d0d8cb9 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -28,6 +28,7 @@ , datetime/0 , sysdescr/0 , sys_interval/0 + , sys_heatbeat_interval/0 ]). -export([info/0]). @@ -92,6 +93,11 @@ datetime() -> sys_interval() -> application:get_env(?APP, broker_sys_interval, 60000). +%% @doc Get sys heatbeat interval +-spec(sys_heatbeat_interval() -> pos_integer()). +sys_heatbeat_interval() -> + application:get_env(?APP, sys_heartbeat, 30000). + %% @doc Get sys info -spec(info() -> list(tuple())). info() -> @@ -111,7 +117,7 @@ init([]) -> {ok, heartbeat(tick(State))}. heartbeat(State) -> - State#state{heartbeat = start_timer(timer:seconds(1), heartbeat)}. + State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}. tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 9571baab8..096e3261f 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -303,26 +303,34 @@ websocket_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {ok, State}. -terminate(SockError, _Req, #state{keepalive = Keepalive, - proto_state = ProtoState, - shutdown = Shutdown}) -> - ?LOG(debug, "Terminated for ~p, sockerror: ~p", - [Shutdown, SockError]), +terminate(WsReason, _Req, #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown = Shutdown}) -> + ?LOG(debug, "Terminated for ~p, websocket reason: ~p", + [Shutdown, WsReason]), emqx_keepalive:cancel(Keepalive), case {ProtoState, Shutdown} of {undefined, _} -> ok; {_, {shutdown, Reason}} -> - emqx_protocol:terminate(Reason, ProtoState), - exit(Reason); - {_, Error} -> - emqx_protocol:terminate(Error, ProtoState), - exit({error, SockError}) + terminate_session(Reason, ProtoState); + {_, _Error} -> + ?LOG(info, "Terminate for unexpected error: ~p", [WsReason]), + terminate_session(unknown, ProtoState) end. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +terminate_session(Reason, ProtoState) -> + emqx_protocol:terminate(Reason, ProtoState), + case emqx_protocol:session(ProtoState) of + undefined -> + ok; + SessionPid -> + SessionPid ! {'EXIT', self(), Reason} + end. + handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> case emqx_protocol:received(Packet, ProtoState) of {ok, NProtoState} ->