From ef8cff4cac4f92127f4599f181103fa5c8e4259c Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 8 Mar 2016 13:19:03 +0800 Subject: [PATCH] emqttd:publish/1 --- src/emqttd_alarm.erl | 4 +- src/emqttd_metrics.erl | 4 +- src/emqttd_session.erl | 91 +++++++++++++++++++----------------------- src/emqttd_stats.erl | 19 +++++---- src/emqttd_sysmon.erl | 2 +- 5 files changed, 56 insertions(+), 64 deletions(-) diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index 8ef08dec0..1b4924463 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -91,12 +91,12 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, {title, iolist_to_binary(Title)}, {summary, iolist_to_binary(Summary)}, {ts, emqttd_time:now_to_secs(Timestamp)}]), - emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)), + emqttd:publish(alarm_msg(alert, AlarmId, Json)), {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]}; handle_event({clear_alarm, AlarmId}, Alarms) -> Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_time:now_to_secs()}]), - emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)), + emqttd:publish(alarm_msg(clear, AlarmId, Json)), {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate}; handle_event(_, Alarms)-> diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index 6b6c6b7a8..c2e57ee8b 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -243,7 +243,7 @@ init([]) -> % Init metrics [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [ok = emqttd_pubsub:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics], + [ok = emqttd:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics], % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. @@ -273,7 +273,7 @@ code_change(_OldVsn, State, _Extra) -> publish(Metric, Val) -> Msg = emqttd_message:make(metrics, metric_topic(Metric), bin(Val)), - emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). + emqttd:publish(emqttd_message:set_flag(sys, Msg)). create_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f9779509f..12b3e680d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -154,6 +154,10 @@ info(SessPid) -> destroy(SessPid, ClientId) -> gen_server2:cast(SessPid, {destroy, ClientId}). +%%-------------------------------------------------------------------- +%% PubSub +%%-------------------------------------------------------------------- + %% @doc Subscribe Topics -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok. subscribe(SessPid, TopicTable) -> @@ -171,11 +175,11 @@ subscribe(SessPid, PacketId, TopicTable) -> -spec publish(pid(), mqtt_message()) -> ok | {error, any()}. publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> %% publish qos0 directly - emqttd_pubsub:publish(Msg); + emqttd:publish(Msg); publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> %% publish qos1 directly, and client will puback automatically - emqttd_pubsub:publish(Msg); + emqttd:publish(Msg); publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> %% publish qos2 by session @@ -281,62 +285,51 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). -handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, +handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), + ?LOG(info, "Subscribe ~p", [TopicTable], Session), + Subscriptions1 = lists:foldl( + fun({Topic, Qos}, SubDict) -> + case dict:find(Topic, SubDict) of + {ok, Qos} -> + ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), + SubDict; + {ok, OldQos} -> + emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), + ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), + dict:store(Topic, Qos, SubDict); + error -> + emqttd:subscribe(ClientId, Topic, Qos), + %%TODO: the design is ugly... + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_retainer:dispatch(Topic, self()), - case TopicTable -- dict:to_list(Subscriptions) of - [] -> - AckFun([Qos || {_, Qos} <- TopicTable]), - hibernate(Session); - _ -> - %% subscribe first and don't care if the subscriptions have been existed - {ok, GrantedQos} = emqttd_pubsub:subscribe(ClientId, TopicTable), - - AckFun(GrantedQos), - - emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), - - ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session), - - Subscriptions1 = - lists:foldl(fun({Topic, Qos}, Dict) -> - case dict:find(Topic, Dict) of - {ok, Qos} -> - ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session), - Dict; - {ok, OldQos} -> - ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session), - dict:store(Topic, Qos, Dict); - error -> - %%TODO: the design is ugly, rewrite later...:( - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_retainer:dispatch(Topic, self()), - - dict:store(Topic, Qos, Dict) - end - end, Subscriptions, TopicTable), - hibernate(Session#session{subscriptions = Subscriptions1}) - end; + dict:store(Topic, Qos, SubDict) + end + end, Subscriptions, TopicTable), + AckFun([Qos || {_, Qos} <- TopicTable]), + emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), + hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), - - %% unsubscribe from topic tree - ok = emqttd_pubsub:unsubscribe(Topics), - ?LOG(info, "unsubscribe ~p", [Topics], Session), - - Subscriptions1 = - lists:foldl(fun(Topic, Dict) -> - dict:erase(Topic, Dict) - end, Subscriptions, Topics), - + Subscriptions1 = lists:foldl( + fun(Topic, SubDict) -> + case dict:find(Topic, SubDict) of + {ok, Qos} -> + emqttd:unsubscribe(ClientId, Topic, Qos), + dict:erase(Topic, SubDict); + error -> + SubDict + end + end, Subscriptions, Topics), hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> @@ -430,7 +423,7 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of {ok, {Msg, TRef}} -> cancel_timer(TRef), - emqttd_pubsub:publish(Msg), + emqttd:publish(Msg), hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index c6a156d81..755cdb515 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -52,14 +52,14 @@ %% $SYS Topics for Subscribers -define(SYSTOP_PUBSUB, [ - 'routes/count', % ... - 'routes/reverse', % ... - 'topics/count', % ... - 'topics/max', % ... + 'routes/count', % ... + 'routes/max', % ... + 'topics/count', % ... + 'topics/max', % ... + 'subscribers/count', % ... + 'subscribers/max', % ... 'subscriptions/count', % ... - 'subscriptions/max', % ... - 'queues/count', % ... - 'queues/max' % ... + 'subscriptions/max' % ... ]). %% $SYS Topic for retained @@ -122,7 +122,7 @@ init([]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), % Create $SYS Topics - [ok = emqttd_pubsub:create(topic, stats_topic(Topic)) || Topic <- Topics], + [ok = emqttd:create(topic, stats_topic(Topic)) || Topic <- Topics], % Tick to publish stats {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. @@ -165,8 +165,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- publish(Stat, Val) -> - Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)), - emqttd_pubsub:publish(Msg). + emqttd:publish(emqttd_message:make(stats, stats_topic(Stat), bin(Val))). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 5fa68bcec..7d6799592 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -157,7 +157,7 @@ procinfo(Pid) -> publish(Sysmon, WarnMsg) -> Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)), - emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). + emqttd:publish(emqttd_message:set_flag(sys, Msg)). topic(Sysmon) -> emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).