diff --git a/etc/emqtt.config b/etc/emqtt.config index c995ebc38..fd1f7d52a 100644 --- a/etc/emqtt.config +++ b/etc/emqtt.config @@ -22,7 +22,7 @@ ]} ]}, {emqtt, [ - {auth, {internal, []}}, %internal, anonymous + {auth, {anonymous, []}}, %internal, anonymous {listeners, [ {1883, [ binary, diff --git a/include/emqtt.hrl b/include/emqtt.hrl index 3b3866ca3..2d3c57029 100644 --- a/include/emqtt.hrl +++ b/include/emqtt.hrl @@ -21,11 +21,14 @@ -define(LICENSE_MESSAGE, "Licensed under the MPL."). -define(PROTOCOL_VERSION, "MQTT/3.1"). + -define(ERTS_MINIMUM, "5.6.3"). +-record(internal_user, {username, passwdhash}). + -record(topic, {words, path}). --record(subscriber, {topic, pid}). +-record(subscriber, {topic, qos, client, monref}). %% --------------------------------- diff --git a/src/emqtt_auth.erl b/src/emqtt_auth.erl index e89252d90..88a11b8bf 100644 --- a/src/emqtt_auth.erl +++ b/src/emqtt_auth.erl @@ -21,7 +21,7 @@ start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). -check(Username, Password) when is_binary(Username) -> +check(Username, Password) -> gen_server2:call(?MODULE, {check, Username, Password}). add(Username, Password) when is_binary(Username) -> diff --git a/src/emqtt_auth_anonymous.erl b/src/emqtt_auth_anonymous.erl index f17dec334..ca41f19b0 100644 --- a/src/emqtt_auth_anonymous.erl +++ b/src/emqtt_auth_anonymous.erl @@ -5,14 +5,10 @@ check/2, delete/1]). -init(_Opts) -> - ok. +init(_Opts) -> ok. -check(Username, _) when is_binary(Username) -> - true. +check(_, _) -> true. -add(Username, _Password) when is_binary(Username) -> - ok. +add(_, _) -> ok. -delete(Username) when is_binary(Username) -> - ok. +delete(_Username) -> ok. diff --git a/src/emqtt_auth_internal.erl b/src/emqtt_auth_internal.erl index 66b3a8e3e..fa1c63789 100644 --- a/src/emqtt_auth_internal.erl +++ b/src/emqtt_auth_internal.erl @@ -14,6 +14,10 @@ init(_Opts) -> mnesia:add_table_copy(internal_user, node(), ram_copies), ok. +check(undefined, _) -> false; + +check(_, undefined) -> false; + check(Username, Password) when is_binary(Username) -> PasswdHash = crypto:md5(Password), case mnesia:dirty_read(internal_user, Username) of @@ -21,9 +25,9 @@ check(Username, Password) when is_binary(Username) -> _ -> false end. -add(Username, Password) -> +add(Username, Password) when is_binary(Username) and is_binary(Password) -> mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:md5(Password)}). -delete(Username) -> +delete(Username) when is_binary(Username) -> mnesia:dirty_delete(internal_user, Username). diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 4cc9eca20..84adebbc8 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -163,6 +163,7 @@ process_received_bytes(Bytes, process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, State ) -> + ?INFO("~p", [Frame]), process_request(Type, Frame, State). process_request(?CONNECT, @@ -172,20 +173,19 @@ process_request(?CONNECT, proto_ver = ProtoVersion, clean_sess = CleanSess, client_id = ClientId } = Var}, #state{socket = Sock} = State) -> - ?INFO("connect frame: ~p~n", [Var]), {ReturnCode, State1} = case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, - emqtt_util:valid_client_id(ClientId)} of + valid_client_id(ClientId)} of {false, _} -> {?CONNACK_PROTO_VER, State}; {_, false} -> {?CONNACK_INVALID_ID, State}; _ -> - case creds(Username, Password) of - nocreds -> + case emqtt_auth:check(Username, Password) of + false -> error_logger:error_msg("MQTT login failed - no credentials~n"), {?CONNACK_CREDENTIALS, State}; - {UserBin, PassBin} -> + true -> {?CONNACK_ACCEPT, State #state{ will_msg = make_will_msg(Var), client_id = ClientId }} @@ -221,7 +221,7 @@ process_request(?PUBLISH, dup = Dup, message_id = MessageId, payload = Payload }, - emqtt_router:route(Msg), + emqtt_router:publish(Topic, Msg), send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, @@ -336,4 +336,7 @@ control_throttle(State = #state{ connection_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. +valid_client_id(ClientId) -> + ClientIdLen = size(ClientId), + 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl index 5b9a76c87..aa6d126e6 100644 --- a/src/emqtt_frame.erl +++ b/src/emqtt_frame.erl @@ -23,7 +23,7 @@ -export([serialise/1]). -define(RESERVED, 0). --define(PROTOCOL_MAGIC, "MQIsdp"). +-define(PROTOCOL_MAGIC, <<"MQIsdp">>). -define(MAX_LEN, 16#fffffff). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). @@ -145,7 +145,7 @@ parse_utf(Bin, _) -> parse_utf(Bin). parse_utf(<>) -> - {binary_to_list(Str), Rest}. + {Str, Rest}. parse_msg(Bin, 0) -> {undefined, Bin}; diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 00c413920..34d98b83b 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -71,14 +71,15 @@ handle_call({subscribe, Topic, Client}, _From, State) -> wildcard -> ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) end, - ets:insert(subscriber, #subscriber{topic=Topic, client=Client}), + Ref = erlang:monitor(process, Client), + ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}), {reply, ok, State}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({unsubscribe, Topic, Client}, State) -> - ets:delete_object(subscriber, #subscriber{topic=Topic, client=Client}), + ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client}), %TODO: how to remove topic % %Words = topic_split(Topic), @@ -93,6 +94,10 @@ handle_cast({unsubscribe, Topic, Client}, State) -> handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. +handle_info({'DOWN', MonitorRef, _Type, _Object, _Info}, State) -> + ets:match_delete(subscriber, #subscriber{monref=MonitorRef}), + {noreply, State}; + handle_info(Info, State) -> {stop, {badinfo, Info}, State}. @@ -127,6 +132,9 @@ topic_match([_H|T1], [<<"+">>|T2]) -> topic_match(_, [<<"#">>]) -> true; +topic_match([_H1|_], [_H2|_]) -> + false; + topic_match([], [_H|_T2]) -> false. diff --git a/src/emqtt_util.erl b/src/emqtt_util.erl deleted file mode 100644 index 46c320e7b..000000000 --- a/src/emqtt_util.erl +++ /dev/null @@ -1,39 +0,0 @@ --module(emqtt_util). - --include("emqtt.hrl"). - --define(CLIENT_ID_MAXLEN, 23). - --compile(export_all). - -binary(L) when is_list(L) -> list_to_binary(L); -binary(B) when is_binary(B) -> B. - -subcription_queue_name(ClientId) -> - Base = "mqtt-subscription-" ++ ClientId ++ "qos", - {list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}. - -%% amqp mqtt descr -%% * + match one topic level -%% # # match multiple topic levels -%% . / topic level separator -mqtt2amqp(Topic) -> - erlang:iolist_to_binary( - re:replace(re:replace(Topic, "/", ".", [global]), - "[\+]", "*", [global])). - -amqp2mqtt(Topic) -> - erlang:iolist_to_binary( - re:replace(re:replace(Topic, "[\*]", "+", [global]), - "[\.]", "/", [global])). - -valid_client_id(ClientId) -> - ClientIdLen = length(ClientId), - 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. - -env(Key) -> - case application:get_env(emqtt, Key) of - {ok, Val} -> Val; - undefined -> undefined - end. -