From ec12acc4ef4d5571e2bb4d39838956b0a5b57fea Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 17 Aug 2020 17:59:35 +0800 Subject: [PATCH] feature(mqtt piggyback): transporting mutli MQTT packets at once or single --- etc/emqx.conf | 10 +++++++++ priv/emqx.schema | 15 +++++++++++-- src/emqx_ws_connection.erl | 43 +++++++++++++++++++++++--------------- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0a41b3dbc..b5cc62e2a 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1651,6 +1651,11 @@ listener.ws.external.nodelay = true ## Value: Number ## listener.ws.external.max_frame_size = 0 +## Whether a WebSocket message is allowed to contain multiple MQTT packets +## +## Value: single | multiple +listener.ws.external.mqtt_piggyback = multiple + ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol @@ -1911,6 +1916,11 @@ listener.wss.external.send_timeout_close = on ## Value: Number ## listener.wss.external.max_frame_size = 0 +## Whether a WebSocket message is allowed to contain multiple MQTT packets +## +## Value: single | multiple +listener.wss.external.mqtt_piggyback = multiple + ##-------------------------------------------------------------------- ## Modules ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 693c7fbef..f341ff16c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1540,6 +1540,12 @@ end}. hidden ]}. +{mapping, "listener.ws.$name.mqtt_piggyback", "emqx.listeners", [ + {datatype, {enum, [single, multiple]}}, + {default, multiple}, + hidden +]}. + %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners @@ -1743,7 +1749,11 @@ end}. hidden ]}. - +{mapping, "listener.wss.$name.mqtt_piggyback", "emqx.listeners", [ + {datatype, {enum, [single, multiple]}}, + {default, multiple}, + hidden +]}. {translation, "emqx.listeners", fun(Conf) -> @@ -1793,7 +1803,8 @@ end}. {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, - {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)} | AccOpts(Prefix)]) + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, + {mqtt_piggyback, cuttlefish:conf_get(Prefix ++ ".mqtt_piggyback", Conf, undefined)} | AccOpts(Prefix)]) end, DeflateOpts = fun(Prefix) -> Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 7c47c38c8..c54cfefb9 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -62,6 +62,8 @@ sockstate :: emqx_types:sockstate(), %% Simulate the active_n opt active_n :: pos_integer(), + %% MQTT Piggyback + mqtt_piggyback :: single | multiple, %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -226,6 +228,7 @@ websocket_init([Req, Opts]) -> RateLimit = emqx_zone:ratelimit(Zone), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), + MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_fun(), @@ -237,19 +240,20 @@ websocket_init([Req, Opts]) -> IdleTimer = start_timer(IdleTimeout, idle_timeout), emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)), emqx_logger:set_metadata_peername(esockd:format(Peername)), - {ok, #state{peername = Peername, - sockname = Sockname, - sockstate = running, - active_n = ActiveN, - limiter = Limiter, - parse_state = ParseState, - serialize = Serialize, - channel = Channel, - gc_state = GcState, - postponed = [], - stats_timer = StatsTimer, - idle_timeout = IdleTimeout, - idle_timer = IdleTimer + {ok, #state{peername = Peername, + sockname = Sockname, + sockstate = running, + active_n = ActiveN, + mqtt_piggyback = MQTTPiggyback, + limiter = Limiter, + parse_state = ParseState, + serialize = Serialize, + channel = Channel, + gc_state = GcState, + postponed = [], + stats_timer = StatsTimer, + idle_timeout = IdleTimeout, + idle_timer = IdleTimer }, hibernate}. websocket_handle({binary, Data}, State) when is_list(Data) -> @@ -514,7 +518,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %% Handle outgoing packets %%-------------------------------------------------------------------- -handle_outgoing(Packets, State = #state{active_n = ActiveN}) -> +handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), @@ -526,7 +530,12 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN}) -> postpone({check_gc, Stats}, State); false -> State end, - {{binary, IoData}, ensure_stats_timer(NState)}. + + {case MQTTPiggyback of + single -> {binary, IoData}; + multiple -> lists:map(fun(Bin) -> {binary, Bin} end, IoData) + end, + ensure_stats_timer(NState)}. serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> @@ -637,8 +646,8 @@ return(State = #state{postponed = Postponed}) -> {[], []} -> {ok, State1}; {[], Cmds} -> {Cmds, State1}; {Packets, Cmds} -> - {Frame, State2} = handle_outgoing(Packets, State1), - {[Frame|Cmds], State2} + {Frames, State2} = handle_outgoing(Packets, State1), + {Frames ++ Cmds, State2} end. classify([], Packets, Cmds, Events) ->