fix sub/pub

This commit is contained in:
erylee 2012-12-22 17:35:28 +08:00
parent 425bc2157e
commit c1b9449c53
9 changed files with 37 additions and 62 deletions

View File

@ -22,7 +22,7 @@
]}
]},
{emqtt, [
{auth, {internal, []}}, %internal, anonymous
{auth, {anonymous, []}}, %internal, anonymous
{listeners, [
{1883, [
binary,

View File

@ -21,11 +21,14 @@
-define(LICENSE_MESSAGE, "Licensed under the MPL.").
-define(PROTOCOL_VERSION, "MQTT/3.1").
-define(ERTS_MINIMUM, "5.6.3").
-record(internal_user, {username, passwdhash}).
-record(topic, {words, path}).
-record(subscriber, {topic, pid}).
-record(subscriber, {topic, qos, client, monref}).
%% ---------------------------------

View File

@ -21,7 +21,7 @@
start_link() ->
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
check(Username, Password) when is_binary(Username) ->
check(Username, Password) ->
gen_server2:call(?MODULE, {check, Username, Password}).
add(Username, Password) when is_binary(Username) ->

View File

@ -5,14 +5,10 @@
check/2,
delete/1]).
init(_Opts) ->
ok.
init(_Opts) -> ok.
check(Username, _) when is_binary(Username) ->
true.
check(_, _) -> true.
add(Username, _Password) when is_binary(Username) ->
ok.
add(_, _) -> ok.
delete(Username) when is_binary(Username) ->
ok.
delete(_Username) -> ok.

View File

@ -14,6 +14,10 @@ init(_Opts) ->
mnesia:add_table_copy(internal_user, node(), ram_copies),
ok.
check(undefined, _) -> false;
check(_, undefined) -> false;
check(Username, Password) when is_binary(Username) ->
PasswdHash = crypto:md5(Password),
case mnesia:dirty_read(internal_user, Username) of
@ -21,9 +25,9 @@ check(Username, Password) when is_binary(Username) ->
_ -> false
end.
add(Username, Password) ->
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:md5(Password)}).
delete(Username) ->
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(internal_user, Username).

View File

@ -163,6 +163,7 @@ process_received_bytes(Bytes,
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
State ) ->
?INFO("~p", [Frame]),
process_request(Type, Frame, State).
process_request(?CONNECT,
@ -172,20 +173,19 @@ process_request(?CONNECT,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
?INFO("connect frame: ~p~n", [Var]),
{ReturnCode, State1} =
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
emqtt_util:valid_client_id(ClientId)} of
valid_client_id(ClientId)} of
{false, _} ->
{?CONNACK_PROTO_VER, State};
{_, false} ->
{?CONNACK_INVALID_ID, State};
_ ->
case creds(Username, Password) of
nocreds ->
case emqtt_auth:check(Username, Password) of
false ->
error_logger:error_msg("MQTT login failed - no credentials~n"),
{?CONNACK_CREDENTIALS, State};
{UserBin, PassBin} ->
true ->
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
client_id = ClientId }}
@ -221,7 +221,7 @@ process_request(?PUBLISH,
dup = Dup,
message_id = MessageId,
payload = Payload },
emqtt_router:route(Msg),
emqtt_router:publish(Topic, Msg),
send_frame(Sock,
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
@ -336,4 +336,7 @@ control_throttle(State = #state{ connection_state = Flow,
stop(Reason, State ) ->
{stop, Reason, State}.
valid_client_id(ClientId) ->
ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.

View File

@ -23,7 +23,7 @@
-export([serialise/1]).
-define(RESERVED, 0).
-define(PROTOCOL_MAGIC, "MQIsdp").
-define(PROTOCOL_MAGIC, <<"MQIsdp">>).
-define(MAX_LEN, 16#fffffff).
-define(HIGHBIT, 2#10000000).
-define(LOWBITS, 2#01111111).
@ -145,7 +145,7 @@ parse_utf(Bin, _) ->
parse_utf(Bin).
parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
{binary_to_list(Str), Rest}.
{Str, Rest}.
parse_msg(Bin, 0) ->
{undefined, Bin};

View File

@ -71,14 +71,15 @@ handle_call({subscribe, Topic, Client}, _From, State) ->
wildcard ->
ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic})
end,
ets:insert(subscriber, #subscriber{topic=Topic, client=Client}),
Ref = erlang:monitor(process, Client),
ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}),
{reply, ok, State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({unsubscribe, Topic, Client}, State) ->
ets:delete_object(subscriber, #subscriber{topic=Topic, client=Client}),
ets:match_delete(subscriber, #subscriber{topic=Topic, client=Client}),
%TODO: how to remove topic
%
%Words = topic_split(Topic),
@ -93,6 +94,10 @@ handle_cast({unsubscribe, Topic, Client}, State) ->
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({'DOWN', MonitorRef, _Type, _Object, _Info}, State) ->
ets:match_delete(subscriber, #subscriber{monref=MonitorRef}),
{noreply, State};
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
@ -127,6 +132,9 @@ topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(_, [<<"#">>]) ->
true;
topic_match([_H1|_], [_H2|_]) ->
false;
topic_match([], [_H|_T2]) ->
false.

View File

@ -1,39 +0,0 @@
-module(emqtt_util).
-include("emqtt.hrl").
-define(CLIENT_ID_MAXLEN, 23).
-compile(export_all).
binary(L) when is_list(L) -> list_to_binary(L);
binary(B) when is_binary(B) -> B.
subcription_queue_name(ClientId) ->
Base = "mqtt-subscription-" ++ ClientId ++ "qos",
{list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}.
%% amqp mqtt descr
%% * + match one topic level
%% # # match multiple topic levels
%% . / topic level separator
mqtt2amqp(Topic) ->
erlang:iolist_to_binary(
re:replace(re:replace(Topic, "/", ".", [global]),
"[\+]", "*", [global])).
amqp2mqtt(Topic) ->
erlang:iolist_to_binary(
re:replace(re:replace(Topic, "[\*]", "+", [global]),
"[\.]", "/", [global])).
valid_client_id(ClientId) ->
ClientIdLen = length(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
env(Key) ->
case application:get_env(emqtt, Key) of
{ok, Val} -> Val;
undefined -> undefined
end.