diff --git a/etc/emqx.conf b/etc/emqx.conf index 15ad24dd1..05cc35413 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2122,7 +2122,7 @@ broker.session_locking_strategy = quorum ## ## Value: Enum ## - random -## - round_robbin +## - round_robin ## - sticky ## - hash broker.shared_subscription_strategy = random diff --git a/priv/emqx.schema b/priv/emqx.schema index e6944ba79..2224c9935 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1996,11 +1996,11 @@ end}. %% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ - {default, round_robbin}, + {default, round_robin}, {datatype, {enum, [random, %% randomly pick a subscriber - round_robbin, %% round robin alive subscribers one message after another + round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash %% hash client ID to a group member ]}} @@ -2125,4 +2125,4 @@ end}. [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] -end}. \ No newline at end of file +end}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 1c9b26a0f..96dec6716 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -503,7 +503,7 @@ init([{username, Username} | Opts], State) -> init([{password, Password} | Opts], State) -> init(Opts, State#state{password = iolist_to_binary(Password)}); init([{keepalive, Secs} | Opts], State) -> - init(Opts, State#state{keepalive = timer:seconds(Secs)}); + init(Opts, State#state{keepalive = Secs}); init([{proto_ver, v3} | Opts], State) -> init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3, proto_name = <<"MQIsdp">>}); @@ -1026,11 +1026,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), end. ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> - ensure_keepalive_timer(timer:seconds(Secs), State); + ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs}); ensure_keepalive_timer(State = #state{keepalive = 0}) -> State; ensure_keepalive_timer(State = #state{keepalive = I}) -> - ensure_keepalive_timer(I, State). + ensure_keepalive_timer(timer:seconds(I), State). ensure_keepalive_timer(I, State) when is_integer(I) -> State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}.