From f1b3bbd7bc81b936dbdd635e6a1e447e8e0c73d0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Dec 2020 17:52:36 +0800 Subject: [PATCH] chore: supply the code_change logic --- rebar.config | 2 +- src/emqx_connection.erl | 56 ++++++++++++++++ src/emqx_frame.erl | 133 ++++++-------------------------------- src/emqx_limiter.erl | 8 +-- test/emqx_frame_SUITE.erl | 15 +---- test/emqx_vm_SUITE.erl | 2 +- 6 files changed, 78 insertions(+), 138 deletions(-) diff --git a/rebar.config b/rebar.config index df71f9762..fc3a95ce2 100644 --- a/rebar.config +++ b/rebar.config @@ -40,7 +40,7 @@ [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]}, {deps, [{bbmustache, "1.7.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}} ]}, {erl_opts, [debug_info]} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 8e8f7117d..ad3694812 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -441,6 +441,62 @@ system_continue(Parent, _Debug, State) -> system_terminate(Reason, _Parent, _Debug, State) -> terminate(Reason, State). +system_code_change(State, _Mod, {down, Vsn}, _Extra) + when Vsn == "4.2.0"; + Vsn == "4.2.1" -> + Channel = State#state.channel, + NSerialize = emqx_frame:serialize_fun(State#state.serialize), + case {State#state.parse_state, element(10, Channel)} of + {{none, _}, undefined} -> + {ok, State#state{serialize = NSerialize}}; + {Ps, Quota} -> + %% BACKW: e4.2.0-e4.2.1 + %% We can't recover/reconstruct anonymous function state for + %% Parser or Quota consumer. So just close it. + ?LOG(error, "Unsupport downgrade connection ~0p, peername: ~0p." + " Due to it have an incomplete frame or unsafed quota counter," + " parser_state: ~0p, quota: ~0p." + " Force close it now!!!", [self(), State#state.peername, Ps, Quota]), + self() ! {close, unsupported_downgrade_connection_state}, + {ok, State#state{serialize = NSerialize}} + end; + +system_code_change(State, _Mod, Vsn, _Extra) + when Vsn == "4.2.0"; + Vsn == "4.2.1" -> + Channel = State#state.channel, + NChannel = + case element(10, Channel) of + undefined -> Channel; + Quoter -> + Zone = element(2, Quoter), + Cks = element(3, Quoter), + NCks = [case Name == overall_messages_routing of + true -> Ck#{consumer => Zone}; + _ -> Ck + end || Ck = #{name := Name} <- Cks], + setelement(10, Channel, setelement(3, Quoter, NCks)) + end, + + NParseState = + case State#state.parse_state of + Ps = {none, _} -> Ps; + Ps when is_function(Ps) -> + case erlang:fun_info(Ps, env) of + {_, [Hdr, Opts]} -> + {{len, #{hdr => Hdr, len => {1,0}}}, Opts}; + {_, [Bin, Hdr, Len, Opts]} when is_binary(Bin) -> + {{body, #{hdr => Hdr, len => Len, rest => Bin}}, Opts}; + {_, [Hdr, Multip, Len, Opts]} -> + {{len, #{hdr => Hdr, len => {Multip, Len}}}, Opts} + end + end, + + {_, [Ver, MaxSize]} = erlang:fun_info(State#state.serialize, env), + NSerialize = #{version => Ver, max_size => MaxSize}, + + {ok, State#state{channel = NChannel, parse_state = NParseState, serialize = NSerialize}}; + system_code_change(State, _Mod, _OldVsn, _Extra) -> {ok, State}. diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index cd41a935c..d09b8416e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -27,27 +27,16 @@ , parse/2 , serialize_fun/0 , serialize_fun/1 - , serialize/1 - , serialize/2 - ]). - -%% The new version APIs to avoid saving -%% anonymous func --export([ parse2/1 - , parse2/2 , serialize_opts/0 , serialize_opts/1 , serialize_pkt/2 + , serialize/1 + , serialize/2 ]). -export_type([ options/0 , parse_state/0 , parse_result/0 - , serialize_fun/0 - ]). - --export_type([ parse_state2/0 - , parse_result2/0 , serialize_opts/0 ]). @@ -56,17 +45,11 @@ version => emqx_types:version() }). --type(parse_state() :: {none, options()} | cont_fun()). --type(parse_state2() :: {none, options()} | {cont_state(), options()}). +-type(parse_state() :: {none, options()} | {cont_state(), options()}). --type(parse_result() :: {more, cont_fun()} +-type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). --type(parse_result2() :: {more, parse_state()} - | {ok, emqx_types:packet(), binary(), parse_state()}). - --type(cont_fun() :: fun((binary()) -> parse_result())). - -type(cont_state() :: {Stage :: len | body, State :: #{hdr := #mqtt_packet_header{}, len := {pos_integer(), non_neg_integer()} | non_neg_integer(), @@ -74,8 +57,6 @@ } }). --type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). - -type(serialize_opts() :: options()). -define(none(Options), {none, Options}). @@ -108,96 +89,13 @@ merge_opts(Options) -> %% Parse MQTT Frame %%-------------------------------------------------------------------- --spec(parse2(binary()) -> parse_result2()). -parse2(Bin) -> - parse2(Bin, initial_parse_state()). - --spec(parse2(binary(), parse_state()) -> parse_result2()). -parse2(<<>>, {none, Options}) -> - {more, {none, Options}}; -parse2(<>, - {none, Options = #{strict_mode := StrictMode}}) -> - %% Validate header if strict mode. - StrictMode andalso validate_header(Type, Dup, QoS, Retain), - Header = #mqtt_packet_header{type = Type, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) - }, - Header1 = case fixqos(Type, QoS) of - QoS -> Header; - FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} - end, - parse_remaining_len2(Rest, Header1, Options); - -parse2(Bin, {{len, #{hdr := Header, - len := {Multiplier, Length}} - }, Options}) when is_binary(Bin) -> - parse_remaining_len2(Bin, Header, Multiplier, Length, Options); -parse2(Bin, {{body, #{hdr := Header, - len := Length, - rest := Rest} - }, Options}) when is_binary(Bin) -> - parse_frame2(<>, Header, Length, Options). - -parse_remaining_len2(<<>>, Header, Options) -> - {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; -parse_remaining_len2(Rest, Header, Options) -> - parse_remaining_len2(Rest, Header, 1, 0, Options). - -parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) - when Length > MaxSize -> - error(frame_too_large); -parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) -> - {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; -%% Match DISCONNECT without payload -parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> - Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), - {ok, Packet, Rest, ?none(Options)}; -%% Match PINGREQ. -parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame2(Rest, Header, 0, Options); -%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... -parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame2(Rest, Header, 2, Options); -parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> - parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); -parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, - Options = #{max_size := MaxSize}) -> - FrameLen = Value + Len * Multiplier, - if - FrameLen > MaxSize -> error(frame_too_large); - true -> parse_frame2(Rest, Header, FrameLen, Options) - end. - -parse_frame2(Bin, Header, 0, Options) -> - {ok, packet(Header), Bin, ?none(Options)}; - -parse_frame2(Bin, Header, Length, Options) -> - case Bin of - <> -> - case parse_packet(Header, FrameBin, Options) of - {Variable, Payload} -> - {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; - Variable = #mqtt_packet_connect{proto_ver = Ver} -> - {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; - Variable -> - {ok, packet(Header, Variable), Rest, ?none(Options)} - end; - TooShortBin -> - {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} - end. - -%% Deprecated parse funcs -%% It should be removed after 4.2.x - -spec(parse(binary()) -> parse_result()). parse(Bin) -> parse(Bin, initial_parse_state()). -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {more, fun(Bin) -> parse(Bin, {none, Options}) end}; + {more, {none, Options}}; parse(<>, {none, Options = #{strict_mode := StrictMode}}) -> %% Validate header if strict mode. @@ -212,11 +110,19 @@ parse(<>, FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} end, parse_remaining_len(Rest, Header1, Options); -parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> - Cont(Bin). + +parse(Bin, {{len, #{hdr := Header, + len := {Multiplier, Length}} + }, Options}) when is_binary(Bin) -> + parse_remaining_len(Bin, Header, Multiplier, Length, Options); +parse(Bin, {{body, #{hdr := Header, + len := Length, + rest := Rest} + }, Options}) when is_binary(Bin) -> + parse_frame(<>, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; + {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). @@ -224,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; + {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), @@ -260,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {more, fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end} + {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} end. -compile({inline, [packet/1, packet/2, packet/3]}). @@ -870,4 +774,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index fad897d92..447e04fea 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -128,13 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) -> _ -> case is_overall_limiter(Name) of true -> - {_, Intv} = case erlang:is_function(Cons) of - true -> %% Compatible with hot-upgrade from e4.2.0, e4.2.1. - %% It should be removed after 4.3.0 - {env, [Zone|_]} = erlang:fun_info(Cons, env), - esockd_limiter:consume({Zone, Name}, Tokens); - _ -> esockd_limiter:consume({Cons, Name}, Tokens) - end, + {_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens), {Intv, Cons}; _ -> esockd_rate_limit:check(Tokens, Cons) diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 7ef60bc69..5d4e146db 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -26,7 +26,6 @@ all() -> [{group, parse}, - {group, parse2}, {group, connect}, {group, connack}, {group, publish}, @@ -45,8 +44,6 @@ groups() -> [t_parse_cont, t_parse_frame_too_large ]}, - {parse2, [parallel], - [t_parse_cont2]}, {connect, [parallel], [t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -132,16 +129,6 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). -t_parse_cont2(_) -> - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), - ParseState = emqx_frame:initial_parse_state(), - <> = serialize_to_binary(Packet), - {more, ContParse} = emqx_frame:parse2(<<>>, ParseState), - {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse), - {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1), - {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2), - {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3). - t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -522,7 +509,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)), ParseState = emqx_frame:initial_parse_state(Opts), - {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState), + {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), NPacket. serialize_to_binary(Packet) -> diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index 19545f0ea..4bc231ad0 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -80,7 +80,7 @@ t_get_port_info(_Config) -> {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), emqx_vm:get_port_info(), ok = gen_tcp:close(Sock), - [Port | _] = erlang:ports(). + [_Port | _] = erlang:ports(). t_transform_port(_Config) -> [Port | _] = erlang:ports(),