diff --git a/apps/emqtt/test/emqtt_topic_tests.erl b/apps/emqtt/test/emqtt_topic_tests.erl index 029015216..c4fb75377 100644 --- a/apps/emqtt/test/emqtt_topic_tests.erl +++ b/apps/emqtt/test/emqtt_topic_tests.erl @@ -115,5 +115,12 @@ words_test() -> ?debugFmt("Time for binary:split: ~p(micro)", [Time2/?N]), ok. +feed_var_test() -> + ?assertEqual(<<"$Q/client/clientId">>, emqtt_topic:feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>)). + +join_test() -> + ?assertEqual(<<"/ab/cd/ef/">>, emqtt_topic:join(words(<<"/ab/cd/ef/">>))), + ?assertEqual(<<"ab/+/#">>, emqtt_topic:join(words(<<"ab/+/#">>))). + -endif. diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index cbec8e384..4de58fbd7 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -28,7 +28,7 @@ -author("Feng Lee "). --export([start/0, env/1, +-export([start/0, env/1, env/2, open_listeners/1, close_listeners/1, load_all_plugins/0, unload_all_plugins/0, load_plugin/1, unload_plugin/1, @@ -54,12 +54,16 @@ start() -> application:start(emqttd). %%------------------------------------------------------------------------------ -%% @doc Get mqtt environment +%% @doc Get environment %% @end %%------------------------------------------------------------------------------ --spec env(atom()) -> undefined | any(). -env(Name) -> - proplists:get_value(Name, application:get_env(emqttd, mqtt, [])). +-spec env(atom()) -> list(). +env(Group) -> + application:get_env(emqttd, Group, []). + +-spec env(atom(), atom()) -> undefined | any(). +env(Group, Name) -> + proplists:get_value(Name, env(Group)). %%------------------------------------------------------------------------------ %% @doc Open Listeners @@ -83,7 +87,7 @@ open_listener({http, Port, Options}) -> mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> - MFArgs = {emqttd_client, start_link, [env(packet)]}, + MFArgs = {emqttd_client, start_link, [env(mqtt, packet)]}, esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). merge_sockopts(Options) -> diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 3eb7ac889..6c5469090 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -121,6 +121,10 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; +handle_info({force_subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; + handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index ed621bffa..eb08b62fd 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -38,6 +38,8 @@ -export([received/2, send/2, redeliver/2, shutdown/2]). +-export([handle/2]). + -export([info/1]). %% Protocol State @@ -126,13 +128,22 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}, case emqttd_access_control:auth(Client, Password) of ok -> + %% Generate one if null ClientId1 = clientid(ClientId, State), - start_keepalive(KeepAlive), + %% Register clientId emqttd_cm:register(ClientId1), + %%Starting session + {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), + %% Force subscriptions + force_subscribe(ClientId1), + %% Start keepalive + start_keepalive(KeepAlive), {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, + session = Session, will_msg = willmsg(Var)}}; {error, Reason}-> - lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), + lager:error("~s@~s: username '~s' login failed - ~s", + [ClientId, emqttd_net:format(Peername), Username, Reason]), {?CONNACK_CREDENTIALS, State1} end; @@ -141,10 +152,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = end, %%TODO: this is not right... notify(connected, ReturnCode1, State2), - send(?CONNACK_PACKET(ReturnCode1), State2), - %%Starting session - {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), - {ok, State2#proto_state{session = Session}}; + send(?CONNACK_PACKET(ReturnCode1), State2); handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> @@ -209,6 +217,10 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) end; +handle({subscribe, Topic, Qos}, State = #proto_state{clientid = ClientId, session = Session}) -> + {ok, NewSession, _GrantedQos} = emqttd_session:subscribe(Session, [{Topic, Qos}]), + {ok, State#proto_state{session = NewSession}}; + %% protect from empty topic list handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); @@ -271,9 +283,13 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqtt_message:from_packet(Packet). +%% generate a clientId +clientid(undefined, State) -> + clientid(<<>>, State); +%%TODO: <<>> is not right. clientid(<<>>, #proto_state{peername = Peername}) -> - <<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>; - + {_, _, MicroSecs} = os:timestamp(), + iolist_to_binary(["emqttd_", base64:encode(emqttd_net:format(Peername)), integer_to_list(MicroSecs)]); clientid(ClientId, _State) -> ClientId. send_willmsg(_ClientId, undefined) -> @@ -282,6 +298,22 @@ send_willmsg(_ClientId, undefined) -> send_willmsg(ClientId, WillMsg) -> emqttd_pubsub:publish(ClientId, WillMsg). +%%TODO: will be fixed in 0.8 +force_subscribe(ClientId) -> + case emqttd_broker:env(forced_subscriptions) of + undefined -> + ingore; + Topics -> + [force_subscribe(ClientId, {Topic, Qos}) || {Topic, Qos} <- Topics] + end. + +force_subscribe(ClientId, {Topic, Qos}) when is_list(Topic) -> + force_subscribe(ClientId, {list_to_binary(Topic), Qos}); + +force_subscribe(ClientId, {Topic, Qos}) when is_binary(Topic) -> + Topic1 = emqtt_topic:feed_var(<<"$c">>, ClientId, Topic), + self() ! {force_subscribe, Topic1, Qos}. + start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> self() ! {keepalive, start, round(Sec * 1.5)}. diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 48abd56f8..51a6a217a 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -438,7 +438,7 @@ setstats(queues) -> mnesia:table_info(queue, size)); setstats(topics) -> - emqttd_stats:setstat('topics/count', 'topics/max', + emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); setstats(subscribers) -> emqttd_stats:setstats('subscribers/count', 'subscribers/max', diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index efc880c5f..454363f11 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -258,7 +258,7 @@ init([ClientId, ClientPid]) -> process_flag(trap_exit, true), %%TODO: Is this OK? or should monitor... true = link(ClientPid), - SessOpts = emqttd:env(session), + SessOpts = emqttd:env(mqtt, session), State = initial_state(ClientId, ClientPid), Expires = proplists:get_value(expires, SessOpts, 1) * 3600, MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000), diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl index 5a38eae08..23ac6aff0 100644 --- a/apps/emqttd/src/emqttd_ws_client.erl +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -56,7 +56,7 @@ %% @end %%------------------------------------------------------------------------------ start_link(Req) -> - PktOpts = emqttd:env(packet), + PktOpts = emqttd:env(mqtt, packet), {ReentryWs, ReplyChannel} = upgrade(Req), {ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []), ReentryWs(#wsocket_state{request = Req, diff --git a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl index d9f8bb710..0ba398249 100644 --- a/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl +++ b/plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl @@ -46,7 +46,7 @@ check(_Client, undefined, _State) -> {error, "Password undefined"}; check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) -> %%TODO: hash password... - case emysql:select(UserTab, {{username, Username}, {password, Password}}) of + case emysql:select(UserTab, {'and', {username, Username}, {password, Password}}) of {ok, []} -> {error, "Username or Password not match"}; {ok, _Record} -> ok end. diff --git a/rel/files/emqttd.config b/rel/files/emqttd.config index d0e4dc5de..6681abbae 100644 --- a/rel/files/emqttd.config +++ b/rel/files/emqttd.config @@ -86,6 +86,9 @@ %% System interval of publishing broker $SYS messages {sys_interval, 60}, + %% Subscribe these topics automatically when client connected + {forced_subscriptions, [{"$Q/client/$c", 0}]}, + %% Retained messages {retained, [ %% Max number of retained messages diff --git a/rel/files/plugins.config b/rel/files/plugins.config index e28dfd1c6..c8e850e57 100644 --- a/rel/files/plugins.config +++ b/rel/files/plugins.config @@ -1,22 +1,22 @@ [ - {emysql, [ - {pool_size, 4}, - {host, "localhost"}, - {port, 3306}, - {username, ""}, - {password, ""}, - {database, "mqtt"}, - {encoding, utf8} - ]}, - {emqttd_auth_mysql, [ - {user_table, mqtt_users} - ]} -%% +% {emysql, [ +% {pool_size, 4}, +% {host, "localhost"}, +% {port, 3306}, +% {username, "root"}, +% {password, "public"}, +% {database, "mqtt"}, +% {encoding, utf8} +% ]}, +% {emqttd_auth_mysql, [ +% {user_table, mqtt_users} +% ]} +% % {emqttd_dashboard, [ % {listener, % {http, 8080, [ % {acceptors, 4}, % {max_clients, 512}]}} % ]} -%% +% ].