diff --git a/etc/emqx.conf b/etc/emqx.conf index a29ff071a..b4c60ee48 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -242,7 +242,7 @@ mqtt.session.max_awaiting_rel = 100 mqtt.session.await_rel_timeout = 20s ## Enable Statistics: on | off -mqtt.session.enable_stats = off +mqtt.session.enable_stats = on ## Expired after 1 day: ## w - week @@ -543,17 +543,6 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true -##-------------------------------------------------------------------- -## HTTP Management API Listener - -## listener.api.mgmt = 127.0.0.1:8080 - -## listener.api.mgmt.acceptors = 4 - -## listener.api.mgmt.max_clients = 64 - -## listener.api.mgmt.access.1 = allow all - ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 9d9cec714..a6d6c06e6 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -174,7 +174,8 @@ will_topic = undefined :: undefined | binary(), will_msg = undefined :: undefined | binary(), username = undefined :: undefined | binary(), - password = undefined :: undefined | binary() + password = undefined :: undefined | binary(), + is_bridge = false :: boolean() }). -record(mqtt_packet_connack, diff --git a/src/emqttd.app.src b/src/emqttd.app.src new file mode 100644 index 000000000..67af8854e --- /dev/null +++ b/src/emqttd.app.src @@ -0,0 +1,12 @@ +{application,emqttd, + [{description,"Erlang MQTT Broker"}, + {vsn,"2.3.1"}, + {modules,[]}, + {registered,[emqttd_sup]}, + {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, + lager_syslog,pbkdf2,bcrypt]}, + {env,[]}, + {mod,{emqttd_app,[]}}, + {maintainers,["Feng Lee "]}, + {licenses,["Apache-2.0"]}, + {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}. diff --git a/src/emqx_parser.erl b/src/emqx_parser.erl index 3af9f984b..9688f8c22 100644 --- a/src/emqx_parser.erl +++ b/src/emqx_parser.erl @@ -79,7 +79,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), %% Fix mosquitto bridge: 0x83, 0x84 - <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1, + <> = Rest1, < {error, protocol_header_corrupt} end; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 532fd49e6..011ab3aec 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -42,9 +42,9 @@ %% ws_initial_headers: Headers from first HTTP request for WebSocket Client. -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser, - will_msg, keepalive, max_clientid_len, session, stats_data, - keepalive_backoff, peercert_username, ws_initial_headers, - mountpoint, connected_at}). + will_msg, keepalive, keepalive_backoff, max_clientid_len, + session, stats_data, mountpoint, ws_initial_headers, + is_bridge, connected_at}). -type(proto_state() :: #proto_state{}). @@ -201,17 +201,18 @@ process(?CONNECT_PACKET(Var), State0) -> password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - client_id = ClientId} = Var, + client_id = ClientId, + is_bridge = IsBridge} = Var, - State1 = repl_username_with_peercert( - State0#proto_state{proto_ver = ProtoVer, - proto_name = ProtoName, - username = Username, - client_id = ClientId, - clean_sess = CleanSess, - keepalive = KeepAlive, - will_msg = willmsg(Var, State0), - connected_at = os:timestamp()}), + State1 = State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + client_id = ClientId, + clean_sess = CleanSess, + keepalive = KeepAlive, + will_msg = willmsg(Var, State0), + is_bridge = IsBridge, + connected_at = os:timestamp()}, {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of @@ -233,7 +234,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> - exit({shutdown, Error}) + {stop, {shutdown, Error}, State2} end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), @@ -355,10 +356,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username, - mountpoint = MountPoint}) + mountpoint = MountPoint, + is_bridge = IsBridge}) when is_record(Msg, mqtt_message) -> emqx_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqx_message:to_packet(unmount(MountPoint, Msg)), State); + send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> @@ -400,12 +402,13 @@ stop_if_auth_failure(_RC, State) -> shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; - -shutdown(conflict, #proto_state{client_id = _ClientId}) -> +shutdown(conflict, _State) -> + %% let it down + ignore; +shutdown(mnesia_conflict, _State) -> %% let it down %% emqx_cm:unreg(ClientId); ignore; - shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], State), Client = client(State), @@ -565,6 +568,15 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. +%%-------------------------------------------------------------------- +%% The retained flag should be propagated for bridge. +%%-------------------------------------------------------------------- + +clean_retain(false, Msg = #mqtt_message{retain = true}) -> + Msg#mqtt_message{retain = false}; +clean_retain(_IsBridge, Msg) -> + Msg. + %%-------------------------------------------------------------------- %% Mount Point %%-------------------------------------------------------------------- diff --git a/src/emqx_router.erl b/src/emqx_router.erl index d50654033..c376325aa 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -284,5 +284,7 @@ clean_routes_(Node) -> mnesia:transaction(Clean). update_stats_() -> - emqx_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)). + Size = mnesia:table_info(mqtt_route, size), + emqx_stats:setstats('routes/count', 'routes/max', Size), + emqx_stats:setstats('topics/count', 'topics/max', Size). diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 3322aed1f..319da070c 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -158,7 +158,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat {reply, {ok, SessPid, true}, State}; {error, Erorr} -> {reply, {error, Erorr}, State} - end + end end; %% Transient Session @@ -185,15 +185,14 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of {ok, ClientId} -> - mnesia:transaction( - fun() -> - case mnesia:wread({mqtt_session, ClientId}) of - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:delete_object(mqtt_session, Sess, write); - [_Sess] -> ok; - [] -> ok - end - end), + case mnesia:dirty_read({mqtt_session, ClientId}) of + [] -> + ok; + [Sess = #mqtt_session{sess_pid = DownPid}] -> + mnesia:dirty_delete_object(Sess); + [_Sess] -> + ok + end, {noreply, erase_monitor(MRef, State), hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), @@ -217,8 +216,7 @@ code_change(_OldVsn, State, _Extra) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> case create_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> - {reply, {ok, SessPid, false}, - monitor_session(ClientId, SessPid, State)}; + {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)}; {error, Error} -> {reply, {error, Error}, State} end. @@ -285,15 +283,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid remove_session(Session); %% Remote node -destroy_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid}) -> +destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) -> Node = node(SessPid), case rpc:call(Node, emqx_session, destroy, [SessPid, ClientId]) of ok -> remove_session(Session); {badrpc, nodedown} -> ?LOG(error, "Node '~s' down", [Node], Session), - remove_session(Session); + remove_session(Session); {badrpc, Reason} -> ?LOG(error, "Failed to destory ~p on remote node ~p for ~s", [SessPid, Node, Reason], Session), @@ -301,10 +298,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, end. remove_session(Session) -> - case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end. + mnesia:dirty_delete_object(Session). monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> MRef = erlang:monitor(process, SessPid), diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index ad5c9732b..da2f30b07 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -44,11 +44,10 @@ init([]) -> %% Helper StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, - permanent, 5000, worker, [?HELPER]}, + permanent, 5000, worker, [?HELPER]}, %% SM Pool Sup - MFA = {?SM, start_link, []}, - PoolSup = emqx_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), - + MFA = {emqttd_sm, start_link, []}, + PoolSup = emqx_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index b9ce291e5..5f0d35038 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -57,14 +57,14 @@ %% $SYS Topics for Subscribers -define(SYSTOP_PUBSUB, [ - 'routes/count', % ... - 'routes/max', % ... 'topics/count', % ... 'topics/max', % ... 'subscribers/count', % ... 'subscribers/max', % ... 'subscriptions/count', % ... - 'subscriptions/max' % ... + 'subscriptions/max', % ... + 'routes/count', % ... + 'routes/max' % ... ]). %% $SYS Topic for retained