fixe issue #111 - forced subscriptions

This commit is contained in:
Feng Lee 2015-05-06 01:58:17 +08:00
parent 0dd046840d
commit 955d9c4ef8
10 changed files with 82 additions and 32 deletions

View File

@ -115,5 +115,12 @@ words_test() ->
?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]),
ok.
feed_var_test() ->
?assertEqual(<<"$Q/client/clientId">>, emqtt_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)).
join_test() ->
?assertEqual(<<"/ab/cd/ef/">>, emqtt_topic:join(words(<<"/ab/cd/ef/">>))),
?assertEqual(<<"ab/+/#">>, emqtt_topic:join(words(<<"ab/+/#">>))).
-endif.

View File

@ -28,7 +28,7 @@
-author("Feng Lee <feng@emqtt.io>").
-export([start/0, env/1,
-export([start/0, env/1, env/2,
open_listeners/1, close_listeners/1,
load_all_plugins/0, unload_all_plugins/0,
load_plugin/1, unload_plugin/1,
@ -54,12 +54,16 @@ start() ->
application:start(emqttd).
%%------------------------------------------------------------------------------
%% @doc Get mqtt environment
%% @doc Get environment
%% @end
%%------------------------------------------------------------------------------
-spec env(atom()) -> undefined | any().
env(Name) ->
proplists:get_value(Name, application:get_env(emqttd, mqtt, [])).
-spec env(atom()) -> list().
env(Group) ->
application:get_env(emqttd, Group, []).
-spec env(atom(), atom()) -> undefined | any().
env(Group, Name) ->
proplists:get_value(Name, env(Group)).
%%------------------------------------------------------------------------------
%% @doc Open Listeners
@ -83,7 +87,7 @@ open_listener({http, Port, Options}) ->
mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) ->
MFArgs = {emqttd_client, start_link, [env(packet)]},
MFArgs = {emqttd_client, start_link, [env(mqtt, packet)]},
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
merge_sockopts(Options) ->

View File

@ -121,6 +121,10 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate};

View File

@ -38,6 +38,8 @@
-export([received/2, send/2, redeliver/2, shutdown/2]).
-export([handle/2]).
-export([info/1]).
%% Protocol State
@ -126,13 +128,22 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr},
case emqttd_access_control:auth(Client, Password) of
ok ->
%% Generate one if null
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
%% Register clientId
emqttd_cm:register(ClientId1),
%%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
%% Force subscriptions
force_subscribe(ClientId1),
%% Start keepalive
start_keepalive(KeepAlive),
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
session = Session,
will_msg = willmsg(Var)}};
{error, Reason}->
lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]),
lager:error("~s@~s: username '~s' login failed - ~s",
[ClientId, emqttd_net:format(Peername), Username, Reason]),
{?CONNACK_CREDENTIALS, State1}
end;
@ -141,10 +152,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
end,
%%TODO: this is not right...
notify(connected, ReturnCode1, State2),
send(?CONNACK_PACKET(ReturnCode1), State2),
%%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}),
{ok, State2#proto_state{session = Session}};
send(?CONNACK_PACKET(ReturnCode1), State2);
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
@ -209,6 +217,10 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
end;
handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) ->
{ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]),
{ok, State#proto_state{session = NewSession}};
%% protect from empty topic list
handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
send(?UNSUBACK_PACKET(PacketId), State);
@ -271,9 +283,13 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqtt_message:from_packet(Packet).
%% generate a clientId
clientid(undefined, State) ->
clientid(<<>>, State);
%%TODO: <<>> is not right.
clientid(<<>>, #proto_state{peername = Peername}) ->
<<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>;
{_, _, MicroSecs} = os:timestamp(),
iolist_to_binary(["emqttd_", base64:encode(emqttd_net:format(Peername)), integer_to_list(MicroSecs)]);
clientid(ClientId, _State) -> ClientId.
send_willmsg(_ClientId, undefined) ->
@ -282,6 +298,22 @@ send_willmsg(_ClientId, undefined) ->
send_willmsg(ClientId, WillMsg) ->
emqttd_pubsub:publish(ClientId, WillMsg).
%%TODO: will be fixed in 0.8
force_subscribe(ClientId) ->
case emqttd_broker:env(forced_subscriptions) of
undefined ->
ingore;
Topics ->
[force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics]
end.
force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) ->
force_subscribe(ClientId, {list_to_binary(Topic), Qos});
force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) ->
Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic),
self() ! {force_subscribe, Topic1, Qos}.
start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 ->
self() ! {keepalive, start, round(Sec * 1.5)}.

View File

@ -438,7 +438,7 @@ setstats(queues) ->
mnesia:table_info(queue, size));
setstats(topics) ->
emqttd_stats:setstat('topics/count', 'topics/max',
emqttd_stats:setstats('topics/count', 'topics/max',
mnesia:table_info(topic, size));
setstats(subscribers) ->
emqttd_stats:setstats('subscribers/count', 'subscribers/max',

View File

@ -258,7 +258,7 @@ init([ClientId, ClientPid]) ->
process_flag(trap_exit, true),
%%TODO: Is this OK? or should monitor...
true = link(ClientPid),
SessOpts = emqttd:env(session),
SessOpts = emqttd:env(mqtt, session),
State = initial_state(ClientId, ClientPid),
Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000),

View File

@ -56,7 +56,7 @@
%% @end
%%------------------------------------------------------------------------------
start_link(Req) ->
PktOpts = emqttd:env(packet),
PktOpts = emqttd:env(mqtt, packet),
{ReentryWs, ReplyChannel} = upgrade(Req),
{ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []),
ReentryWs(#wsocket_state{request = Req,

View File

@ -46,7 +46,7 @@ check(_Client, undefined, _State) ->
{error, "Password undefined"};
check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) ->
%%TODO: hash password...
case emysql:select(UserTab, {{username, Username}, {password, Password}}) of
case emysql:select(UserTab, {'and', {username, Username}, {password, Password}}) of
{ok, []} -> {error, "Username or Password not match"};
{ok, _Record} -> ok
end.

View File

@ -86,6 +86,9 @@
%% System interval of publishing broker $SYS messages
{sys_interval, 60},
%% Subscribe these topics automatically when client connected
{forced_subscriptions, [{"$Q/client/$c", 0}]},
%% Retained messages
{retained, [
%% Max number of retained messages

View File

@ -1,22 +1,22 @@
[
{emysql, [
{pool_size, 4},
{host, "localhost"},
{port, 3306},
{username, ""},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]},
{emqttd_auth_mysql, [
{user_table, mqtt_users}
]}
%%
% {emysql, [
% {pool_size, 4},
% {host, "localhost"},
% {port, 3306},
% {username, "root"},
% {password, "public"},
% {database, "mqtt"},
% {encoding, utf8}
% ]},
% {emqttd_auth_mysql, [
% {user_table, mqtt_users}
% ]}
%
% {emqttd_dashboard, [
% {listener,
% {http, 8080, [
% {acceptors, 4},
% {max_clients, 512}]}}
% ]}
%%
%
].