Merge branch 'master' into dev
This commit is contained in:
commit
63933d891a
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -2,6 +2,18 @@
|
||||||
emqttd ChangeLog
|
emqttd ChangeLog
|
||||||
==================
|
==================
|
||||||
|
|
||||||
|
0.8.5-beta (2015-06-10)
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
Bugfix: issue #53 - client will receive duplicate messages when overlapping subscription
|
||||||
|
|
||||||
|
|
||||||
|
0.8.4-beta (2015-06-08)
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
Bugfix: issue #165 - duplicated message when publish 'retained' message to persistent client
|
||||||
|
|
||||||
|
|
||||||
0.8.3-beta (2015-06-05)
|
0.8.3-beta (2015-06-05)
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
|
||||||
|
# CONTRIBUTORS
|
||||||
|
|
||||||
|
* [@callbay](https://github.com/callbay)
|
||||||
|
* [@hejin1026](https://github.com/hejin1026)
|
||||||
|
* [@desoulter](https://github.com/desoulter)
|
||||||
|
* [@turtleDeng](https://github.com/turtleDeng)
|
||||||
|
* [@Hades32](https://github.com/Hades32)
|
||||||
|
* [@huangdan](https://github.com/huangdan)
|
||||||
|
|
12
README.md
12
README.md
|
@ -102,12 +102,12 @@ The MIT License (MIT)
|
||||||
|
|
||||||
## Contributors
|
## Contributors
|
||||||
|
|
||||||
[@hejin1026](https://github.com/hejin1026)
|
* [@hejin1026](https://github.com/hejin1026)
|
||||||
[@desoulter](https://github.com/desoulter)
|
* [@desoulter](https://github.com/desoulter)
|
||||||
[@turtleDeng](https://github.com/turtleDeng)
|
* [@turtleDeng](https://github.com/turtleDeng)
|
||||||
[@Hades32](https://github.com/Hades32)
|
* [@Hades32](https://github.com/Hades32)
|
||||||
[@huangdan](https://github.com/huangdan)
|
* [@huangdan](https://github.com/huangdan)
|
||||||
[@callbay](https://github.com/callbay)
|
* [@callbay](https://github.com/callbay)
|
||||||
|
|
||||||
|
|
||||||
## Author
|
## Author
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{application, emqtt,
|
{application, emqtt,
|
||||||
[
|
[
|
||||||
{description, "Erlang MQTT Common Library"},
|
{description, "Erlang MQTT Common Library"},
|
||||||
{vsn, "0.8.3"},
|
{vsn, git},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqttd_pubsub).
|
-module(emqttd_pubsub).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) ->
|
||||||
init([Id, _Opts]) ->
|
init([Id, _Opts]) ->
|
||||||
process_flag(min_heap_size, 1024*1024),
|
process_flag(min_heap_size, 1024*1024),
|
||||||
gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
|
gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
|
||||||
|
%%TODO: gb_trees to replace maps?
|
||||||
{ok, #state{id = Id, submap = maps:new()}}.
|
{ok, #state{id = Id, submap = maps:new()}}.
|
||||||
|
|
||||||
handle_call({subscribe, SubPid, Topics}, _From, State) ->
|
handle_call({subscribe, SubPid, Topics}, _From, State) ->
|
||||||
|
@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
|
%% Fix issue #53 - Remove Overlapping Subscriptions
|
||||||
|
add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}})
|
||||||
|
when is_record(TopicR, mqtt_topic) ->
|
||||||
case add_topic(TopicR) of
|
case add_topic(TopicR) of
|
||||||
ok ->
|
ok ->
|
||||||
|
OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos}
|
||||||
|
<- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid),
|
||||||
|
SubTopic =:= Topic, SubQos =/= Qos],
|
||||||
|
|
||||||
|
%% remove overlapping subscribers
|
||||||
|
if
|
||||||
|
length(OverlapSubs) =:= 0 -> ok;
|
||||||
|
true ->
|
||||||
|
lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]),
|
||||||
|
[mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs]
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% insert subscriber
|
||||||
mnesia:write(subscriber, Subscriber, write);
|
mnesia:write(subscriber, Subscriber, write);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
|
|
|
@ -175,17 +175,31 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
|
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
|
||||||
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
|
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
|
||||||
Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
|
|
||||||
case Resubs of
|
%% subscribe first and don't care if the subscriptions have been existed
|
||||||
[] -> ok;
|
|
||||||
_ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
|
|
||||||
end,
|
|
||||||
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
|
||||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
||||||
|
|
||||||
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
|
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
|
||||||
[ClientId, Topics, GrantedQos]),
|
[ClientId, Topics, GrantedQos]),
|
||||||
%%TODO: should be gen_event and notification...
|
|
||||||
[emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
|
|
||||||
|
%% <MQTT V3.1.1>: 3.8.4
|
||||||
|
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||||
|
%% a new Subscription is created and all matching retained messages are sent.
|
||||||
|
lists:foreach(fun({Name, _Qos}) ->
|
||||||
|
case maps:is_key(Name, SubMap) of
|
||||||
|
true ->
|
||||||
|
lager:warning("~s resubscribe ~p", [ClientId, Name]);
|
||||||
|
false ->
|
||||||
|
%%TODO: this is not right, rewrite later...
|
||||||
|
emqttd_msg_store:redeliver(Name, self())
|
||||||
|
end
|
||||||
|
end, Topics),
|
||||||
|
|
||||||
|
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) ->
|
||||||
|
maps:put(Name, Qos, Acc)
|
||||||
|
end, SubMap, Topics),
|
||||||
|
|
||||||
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
||||||
|
|
||||||
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
||||||
|
|
Binary file not shown.
|
@ -4,7 +4,7 @@
|
||||||
{lib_dirs, ["../apps", "../deps", "../plugins"]},
|
{lib_dirs, ["../apps", "../deps", "../plugins"]},
|
||||||
{erts, [{mod_cond, derived}, {app_file, strip}]},
|
{erts, [{mod_cond, derived}, {app_file, strip}]},
|
||||||
{app_file, strip},
|
{app_file, strip},
|
||||||
{rel, "emqttd", "0.8.3",
|
{rel, "emqttd", git,
|
||||||
[
|
[
|
||||||
kernel,
|
kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
|
|
Loading…
Reference in New Issue