From 45ca461fd9957bc2b26e1fce82306a85ba660243 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 30 Mar 2017 14:41:29 +0800 Subject: [PATCH] Add ignore self publish message --- etc/emq.conf | 3 +++ priv/emq.schema | 9 ++++++++- src/emqttd_session.erl | 16 ++++++++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 286bd4163..dcdc289f4 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -173,6 +173,9 @@ mqtt.session.enable_stats = off ## s - second mqtt.session.expiry_interval = 2h +## Ignore message from self publish +mqtt.session.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## MQTT Message Queue ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 2760438f9..0f59cad69 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -436,6 +436,12 @@ end}. {datatype, {duration, ms}} ]}. +%% @doc Ignore message from self publish +{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqttd.session", fun(Conf) -> [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, @@ -444,7 +450,8 @@ end}. {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}] + {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, + {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] end}. %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 316a97741..ba3c42036 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -152,7 +152,9 @@ %% Force GC Count force_gc_count :: undefined | integer(), - created_at :: erlang:timestamp() + created_at :: erlang:timestamp(), + + ignore_loop_deliver = false :: boolean() }). -define(TIMEOUT, 60000). @@ -284,6 +286,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> {ok, QEnv} = emqttd:env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), + IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, @@ -304,7 +307,8 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, force_gc_count = ForceGcCount, - created_at = os:timestamp()}, + created_at = os:timestamp(), + ignore_loop_deliver = IgnoreLoopDeliver}, emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_hooks:run('session.created', [ClientId, Username]), {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}. @@ -525,6 +529,14 @@ handle_cast({destroy, ClientId}, handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). +%% Dispatch message from self publish +handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, + State = #state{client_id = ClientId, + ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> + case IgnoreLoopDeliver of + true -> {noreply, State, hibernate}; + false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate} + end; %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};