From 26fb809dbed287854cee2d97fb717cb72751649c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Nov 2017 14:09:46 +0800 Subject: [PATCH] Fix issue#1293 - the retained flags should be propagated for bridge. --- include/emqttd_protocol.hrl | 3 ++- src/emqttd_parser.erl | 5 +++-- src/emqttd_protocol.erl | 20 ++++++++++++++++---- src/emqttd_session.erl | 27 +++++++++++---------------- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 9d9cec714..a6d6c06e6 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -174,7 +174,8 @@ will_topic = undefined :: undefined | binary(), will_msg = undefined :: undefined | binary(), username = undefined :: undefined | binary(), - password = undefined :: undefined | binary() + password = undefined :: undefined | binary(), + is_bridge = false :: boolean() }). -record(mqtt_packet_connack, diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 4699b3f77..91df07d77 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -79,7 +79,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), %% Fix mosquitto bridge: 0x83, 0x84 - <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1, + <> = Rest1, < {error, protocol_header_corrupt} end; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 35c914b14..b30978ac6 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -44,7 +44,7 @@ clean_sess, proto_ver, proto_name, username, is_superuser, will_msg, keepalive, keepalive_backoff, max_clientid_len, session, stats_data, mountpoint, ws_initial_headers, - connected_at}). + is_bridge, connected_at}). -type(proto_state() :: #proto_state{}). @@ -180,7 +180,8 @@ process(?CONNECT_PACKET(Var), State0) -> password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - client_id = ClientId} = Var, + client_id = ClientId, + is_bridge = IsBridge} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, @@ -189,6 +190,7 @@ process(?CONNECT_PACKET(Var), State0) -> clean_sess = CleanSess, keepalive = KeepAlive, will_msg = willmsg(Var, State0), + is_bridge = IsBridge, connected_at = os:timestamp()}, {ReturnCode1, SessPresent, State3} = @@ -333,10 +335,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username, - mountpoint = MountPoint}) + mountpoint = MountPoint, + is_bridge = IsBridge}) when is_record(Msg, mqtt_message) -> emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State); + send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> @@ -543,6 +546,15 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. +%%-------------------------------------------------------------------- +%% The retained flag should be propagated for bridge. +%%-------------------------------------------------------------------- + +clean_retain(false, Msg = #mqtt_message{retain = true}) -> + Msg#mqtt_message{retain = false}; +clean_retain(true, Msg) -> + Msg. + %%-------------------------------------------------------------------- %% Mount Point %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index aa08d746c..854dee0a5 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -152,9 +152,10 @@ %% Force GC Count force_gc_count :: undefined | integer(), - created_at :: erlang:timestamp(), + %% Ignore loop deliver? + ignore_loop_deliver = false :: boolean(), - ignore_loop_deliver = false :: boolean() + created_at :: erlang:timestamp() }). -define(TIMEOUT, 60000). @@ -529,17 +530,14 @@ handle_cast({destroy, ClientId}, handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -%% Dispatch message from self publish -handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, - State = #state{client_id = ClientId, - ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> - case IgnoreLoopDeliver of - true -> {noreply, State, hibernate}; - false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate} - end; +%% Ignore Messages delivered by self +handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, + State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> + hibernate(State); + %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - {noreply, handle_dispatch(Topic, Msg, State), hibernate}; + hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))); %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> @@ -552,7 +550,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) -> hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); handle_info({timeout, _Timer, expired}, State) -> - ?LOG(debug, "Expired, shutdown now.", [], State), + ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, @@ -563,7 +561,7 @@ handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, expiry_interval = Interval}) -> - ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State), + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, hibernate(emit_stats(State1)); @@ -687,9 +685,6 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) %% Dispatch Messages %%-------------------------------------------------------------------- -handle_dispatch(Topic, Msg, State) -> - gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)). - %% Enqueue message if the client has been disconnected dispatch(Msg, State = #state{client_pid = undefined}) -> enqueue_msg(Msg, State);