From 6e5134a8b2983f2184c7281a7ecc7535c6a428b2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 7 Dec 2017 16:24:48 +0800 Subject: [PATCH] Merge with the enterprise edition --- Makefile | 2 +- src/emqx.erl | 10 +- src/emqx_app.erl | 1 - src/emqx_cli.erl | 613 ------------------ src/emqx_cli_config.erl | 363 ----------- src/emqx_ctl.erl | 9 - src/emqx_http.erl | 236 ------- src/emqx_mgmt.erl.bk | 544 ---------------- src/emqx_protocol.erl | 20 +- src/emqx_router.erl | 14 +- test/emqttd_cli_SUITE.erl | 52 -- test/emqx_SUITE.erl | 152 ++--- test/emqx_config_SUITE.erl | 149 ----- test/emqx_mod_SUITE.erl | 4 +- ...router_SUITE.erl => emqx_router_SUITE.erl} | 18 +- 15 files changed, 88 insertions(+), 2099 deletions(-) delete mode 100644 src/emqx_cli.erl delete mode 100644 src/emqx_cli_config.erl delete mode 100644 src/emqx_http.erl delete mode 100644 src/emqx_mgmt.erl.bk delete mode 100644 test/emqttd_cli_SUITE.erl delete mode 100644 test/emqx_config_SUITE.erl rename test/{emqttd_router_SUITE.erl => emqx_router_SUITE.erl} (93%) diff --git a/Makefile b/Makefile index e0a06a673..674fa4924 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ - emqx_vm emqx_net emqx_protocol emqx_access emqx_config emqx_router + emqx_vm emqx_net emqx_protocol emqx_access emqx_router CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/src/emqx.erl b/src/emqx.erl index bdb88ed9a..07e133cf7 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -112,10 +112,10 @@ start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> %% Start https listener start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> - {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}); + {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). -start_listener({Proto, ListenOn, Opts}) when Proto == api -> - {ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()). +% start_listener({Proto, ListenOn, Opts}) when Proto == api -> +% {ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), @@ -144,8 +144,8 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:stop_http('mqtt:ws', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> mochiweb:stop_http('mqtt:wss', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> - mochiweb:stop_http('mqtt:api', ListenOn); +% stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> +% mochiweb:stop_http('mqtt:api', ListenOn); stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). diff --git a/src/emqx_app.erl b/src/emqx_app.erl index a3066f7de..8b31f669f 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -37,7 +37,6 @@ start(_Type, _Args) -> print_banner(), ekka:start(), {ok, Sup} = emqx_sup:start_link(), - ok = emqx_cli:load(), ok = register_acl_mod(), start_autocluster(), register(emqx, self()), diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl deleted file mode 100644 index 14c1498b4..000000000 --- a/src/emqx_cli.erl +++ /dev/null @@ -1,613 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_cli). - --author("Feng Lee "). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include("emqx_cli.hrl"). - --import(lists, [foreach/2]). - --import(proplists, [get_value/2]). - --export([load/0]). - --export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1, - routes/1, topics/1, subscriptions/1, plugins/1, bridges/1, - listeners/1, vm/1, mnesia/1, trace/1, acl/1]). - --define(PROC_INFOKEYS, [status, - memory, - message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions]). - --define(MAX_LIMIT, 10000). - --define(APP, emqx). - --spec(load() -> ok). -load() -> - Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], - lists:foreach(fun(Cmd) -> emqx_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) end, Cmds), - emqx_cli_config:register_config(). - -is_cmd(Fun) -> - not lists:member(Fun, [init, load, module_info]). - -%%-------------------------------------------------------------------- -%% Commands -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @doc Node status - -status([]) -> - {InternalStatus, _ProvidedStatus} = init:get_status(), - ?PRINT("Node ~p is ~p~n", [node(), InternalStatus]), - case lists:keysearch(?APP, 1, application:which_applications()) of - false -> - ?PRINT("~s is not running~n", [?APP]); - {value, {?APP, _Desc, Vsn}} -> - ?PRINT("~s ~s is running~n", [?APP, Vsn]) - end; -status(_) -> - ?PRINT_CMD("status", "Show broker status"). - -%%-------------------------------------------------------------------- -%% @doc Query broker - -broker([]) -> - Funs = [sysdescr, version, uptime, datetime], - foreach(fun(Fun) -> - ?PRINT("~-10s: ~s~n", [Fun, emqx_broker:Fun()]) - end, Funs); - -broker(["stats"]) -> - foreach(fun({Stat, Val}) -> - ?PRINT("~-20s: ~w~n", [Stat, Val]) - end, emqx_stats:getstats()); - -broker(["metrics"]) -> - foreach(fun({Metric, Val}) -> - ?PRINT("~-24s: ~w~n", [Metric, Val]) - end, lists:sort(emqx_metrics:all())); - -broker(["pubsub"]) -> - Pubsubs = supervisor:which_children(emqx_pubsub_sup:pubsub_pool()), - foreach(fun({{_, Id}, Pid, _, _}) -> - ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS), - ?PRINT("pubsub: ~w~n", [Id]), - foreach(fun({Key, Val}) -> - ?PRINT(" ~-18s: ~w~n", [Key, Val]) - end, ProcInfo) - end, lists:reverse(Pubsubs)); - -broker(_) -> - ?USAGE([{"broker", "Show broker version, uptime and description"}, - {"broker pubsub", "Show process_info of pubsub"}, - {"broker stats", "Show broker statistics of clients, topics, subscribers"}, - {"broker metrics", "Show broker metrics"}]). - -%%-------------------------------------------------------------------- -%% @doc Cluster with other nodes - -cluster(["join", SNode]) -> - case ekka:join(ekka_node:parse_name(SNode)) of - ok -> - ?PRINT_MSG("Join the cluster successfully.~n"), - cluster(["status"]); - ignore -> - ?PRINT_MSG("Ignore.~n"); - {error, Error} -> - ?PRINT("Failed to join the cluster: ~p~n", [Error]) - end; - -cluster(["leave"]) -> - case ekka:leave() of - ok -> - ?PRINT_MSG("Leave the cluster successfully.~n"), - cluster(["status"]); - {error, Error} -> - ?PRINT("Failed to leave the cluster: ~p~n", [Error]) - end; - -cluster(["force-leave", SNode]) -> - case ekka:force_leave(ekka_node:parse_name(SNode)) of - ok -> - ?PRINT_MSG("Remove the node from cluster successfully.~n"), - cluster(["status"]); - ignore -> - ?PRINT_MSG("Ignore.~n"); - {error, Error} -> - ?PRINT("Failed to remove the node from cluster: ~p~n", [Error]) - end; - -cluster(["status"]) -> - ?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]); - -cluster(_) -> - ?USAGE([{"cluster join ", "Join the cluster"}, - {"cluster leave", "Leave the cluster"}, - {"cluster force-leave ","Force the node leave from cluster"}, - {"cluster status", "Cluster status"}]). - -%%-------------------------------------------------------------------- -%% @doc Users usage - -users(Args) -> emqx_auth_username:cli(Args). - -acl(["reload"]) -> emqx_access_control:reload_acl(); -acl(_) -> ?USAGE([{"acl reload", "reload etc/acl.conf"}]). - -%%-------------------------------------------------------------------- -%% @doc Query clients - -clients(["list"]) -> - dump(mqtt_client); - -clients(["show", ClientId]) -> - if_client(ClientId, fun print/1); - -clients(["kick", ClientId]) -> - if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqx_client:kick(Pid) end); - -clients(_) -> - ?USAGE([{"clients list", "List all clients"}, - {"clients show ", "Show a client"}, - {"clients kick ", "Kick out a client"}]). - -if_client(ClientId, Fun) -> - case emqx_cm:lookup(bin(ClientId)) of - undefined -> ?PRINT_MSG("Not Found.~n"); - Client -> Fun(Client) - end. - -%%-------------------------------------------------------------------- -%% @doc Sessions Command - -sessions(["list"]) -> - dump(mqtt_local_session); - -%% performance issue? - -sessions(["list", "persistent"]) -> - lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', false, '_'})); - -%% performance issue? - -sessions(["list", "transient"]) -> - lists:foreach(fun print/1, ets:match_object(mqtt_local_session, {'_', '_', true, '_'})); - -sessions(["show", ClientId]) -> - case ets:lookup(mqtt_local_session, bin(ClientId)) of - [] -> ?PRINT_MSG("Not Found.~n"); - [SessInfo] -> print(SessInfo) - end; - -sessions(_) -> - ?USAGE([{"sessions list", "List all sessions"}, - {"sessions list persistent", "List all persistent sessions"}, - {"sessions list transient", "List all transient sessions"}, - {"sessions show ", "Show a session"}]). - -%%-------------------------------------------------------------------- -%% @doc Routes Command - -routes(["list"]) -> - Routes = emqx_router:dump(), - foreach(fun print/1, Routes); - -routes(["show", Topic]) -> - Routes = lists:append(ets:lookup(mqtt_route, bin(Topic)), - ets:lookup(mqtt_local_route, bin(Topic))), - foreach(fun print/1, Routes); - -routes(_) -> - ?USAGE([{"routes list", "List all routes"}, - {"routes show ", "Show a route"}]). - -%%-------------------------------------------------------------------- -%% @doc Topics Command - -topics(["list"]) -> - lists:foreach(fun(Topic) -> ?PRINT("~s~n", [Topic]) end, emqx:topics()); - -topics(["show", Topic]) -> - print(mnesia:dirty_read(mqtt_route, bin(Topic))); - -topics(_) -> - ?USAGE([{"topics list", "List all topics"}, - {"topics show ", "Show a topic"}]). - -subscriptions(["list"]) -> - lists:foreach(fun(Subscription) -> - print(subscription, Subscription) - end, ets:tab2list(mqtt_subscription)); - -subscriptions(["show", ClientId]) -> - case emqx:subscriptions(bin(ClientId)) of - [] -> ?PRINT_MSG("Not Found.~n"); - Subscriptions -> - [print(subscription, Sub) || Sub <- Subscriptions] - end; - -subscriptions(["add", ClientId, Topic, QoS]) -> - if_valid_qos(QoS, fun(IntQos) -> - case emqx_sm:lookup_session(bin(ClientId)) of - undefined -> - ?PRINT_MSG("Error: Session not found!"); - #mqtt_session{sess_pid = SessPid} -> - {Topic1, Options} = emqx_topic:parse(bin(Topic)), - emqx_session:subscribe(SessPid, [{Topic1, [{qos, IntQos}|Options]}]), - ?PRINT_MSG("ok~n") - end - end); - -subscriptions(["del", ClientId, Topic]) -> - case emqx_sm:lookup_session(bin(ClientId)) of - undefined -> - ?PRINT_MSG("Error: Session not found!"); - #mqtt_session{sess_pid = SessPid} -> - emqx_session:unsubscribe(SessPid, [emqx_topic:parse(bin(Topic))]), - ?PRINT_MSG("ok~n") - end; - -subscriptions(_) -> - ?USAGE([{"subscriptions list", "List all subscriptions"}, - {"subscriptions show ", "Show subscriptions of a client"}, - {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete a static subscription manually"}]). - -%if_could_print(Tab, Fun) -> -% case mnesia:table_info(Tab, size) of -% Size when Size >= ?MAX_LIMIT -> -% ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]); -% _Size -> -% Keys = mnesia:dirty_all_keys(Tab), -% foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys) -% end. - -if_valid_qos(QoS, Fun) -> - try list_to_integer(QoS) of - Int when ?IS_QOS(Int) -> Fun(Int); - _ -> ?PRINT_MSG("QoS should be 0, 1, 2~n") - catch _:_ -> - ?PRINT_MSG("QoS should be 0, 1, 2~n") - end. - -plugins(["list"]) -> - foreach(fun print/1, emqx_plugins:list()); - -plugins(["load", Name]) -> - case emqx_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> - ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> - ?PRINT("load plugin error: ~p~n", [Reason]) - end; - -plugins(["unload", Name]) -> - case emqx_plugins:unload(list_to_atom(Name)) of - ok -> - ?PRINT("Plugin ~s unloaded successfully.~n", [Name]); - {error, Reason} -> - ?PRINT("unload plugin error: ~p~n", [Reason]) - end; - -plugins(_) -> - ?USAGE([{"plugins list", "Show loaded plugins"}, - {"plugins load ", "Load plugin"}, - {"plugins unload ", "Unload plugin"}]). - -%%-------------------------------------------------------------------- -%% @doc Bridges command - -bridges(["list"]) -> - foreach(fun({Node, Topic, _Pid}) -> - ?PRINT("bridge: ~s--~s-->~s~n", [node(), Topic, Node]) - end, emqx_bridge_sup_sup:bridges()); - -bridges(["options"]) -> - ?PRINT_MSG("Options:~n"), - ?PRINT_MSG(" qos = 0 | 1 | 2~n"), - ?PRINT_MSG(" prefix = string~n"), - ?PRINT_MSG(" suffix = string~n"), - ?PRINT_MSG(" queue = integer~n"), - ?PRINT_MSG("Example:~n"), - ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); - -bridges(["start", SNode, Topic]) -> - case emqx_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; - -bridges(["start", SNode, Topic, OptStr]) -> - Opts = parse_opts(bridge, OptStr), - case emqx_bridge_sup_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic), Opts) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; - -bridges(["stop", SNode, Topic]) -> - case emqx_bridge_sup_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of - ok -> ?PRINT_MSG("bridge is stopped.~n"); - {error, Error} -> ?PRINT("error: ~p~n", [Error]) - end; - -bridges(_) -> - ?USAGE([{"bridges list", "List bridges"}, - {"bridges options", "Bridge options"}, - {"bridges start ", "Start a bridge"}, - {"bridges start ", "Start a bridge with options"}, - {"bridges stop ", "Stop a bridge"}]). - -parse_opts(Cmd, OptStr) -> - Tokens = string:tokens(OptStr, ","), - [parse_opt(Cmd, list_to_atom(Opt), Val) - || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. -parse_opt(bridge, qos, Qos) -> - {qos, list_to_integer(Qos)}; -parse_opt(bridge, suffix, Suffix) -> - {topic_suffix, bin(Suffix)}; -parse_opt(bridge, prefix, Prefix) -> - {topic_prefix, bin(Prefix)}; -parse_opt(bridge, queue, Len) -> - {max_queue_len, list_to_integer(Len)}; -parse_opt(_Cmd, Opt, _Val) -> - ?PRINT("Bad Option: ~s~n", [Opt]). - -%%-------------------------------------------------------------------- -%% @doc vm command - -vm([]) -> - vm(["all"]); - -vm(["all"]) -> - [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; - -vm(["load"]) -> - [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqx_vm:loads()]; - -vm(["memory"]) -> - [?PRINT("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; - -vm(["process"]) -> - foreach(fun({Name, Key}) -> - ?PRINT("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) - end, [{limit, process_limit}, {count, process_count}]); - -vm(["io"]) -> - IoInfo = erlang:system_info(check_io), - foreach(fun(Key) -> - ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) - end, [max_fds, active_fds]); - -vm(["ports"]) -> - foreach(fun({Name, Key}) -> - ?PRINT("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)]) - end, [{count, port_count}, {limit, port_limit}]); - -vm(_) -> - ?USAGE([{"vm all", "Show info of Erlang VM"}, - {"vm load", "Show load of Erlang VM"}, - {"vm memory", "Show memory of Erlang VM"}, - {"vm process", "Show process of Erlang VM"}, - {"vm io", "Show IO of Erlang VM"}, - {"vm ports", "Show Ports of Erlang VM"}]). - -%%-------------------------------------------------------------------- -%% @doc mnesia Command - -mnesia([]) -> - mnesia:system_info(); - -mnesia(_) -> - ?PRINT_CMD("mnesia", "Mnesia system info"). - -%%-------------------------------------------------------------------- -%% @doc Trace Command - -trace(["list"]) -> - foreach(fun({{Who, Name}, LogFile}) -> - ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) - end, emqx_trace:all_traces()); - -trace(["client", ClientId, "off"]) -> - trace_off(client, ClientId); - -trace(["client", ClientId, LogFile]) -> - trace_on(client, ClientId, LogFile); - -trace(["topic", Topic, "off"]) -> - trace_off(topic, Topic); - -trace(["topic", Topic, LogFile]) -> - trace_on(topic, Topic, LogFile); - -trace(_) -> - ?USAGE([{"trace list", "List all traces"}, - {"trace client ","Trace a client"}, - {"trace client off", "Stop tracing a client"}, - {"trace topic ", "Trace a topic"}, - {"trace topic off", "Stop tracing a Topic"}]). - -trace_on(Who, Name, LogFile) -> - case emqx_trace:start_trace({Who, iolist_to_binary(Name)}, LogFile) of - ok -> - ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) - end. - -trace_off(Who, Name) -> - case emqx_trace:stop_trace({Who, iolist_to_binary(Name)}) of - ok -> - ?PRINT("stop tracing ~s ~s successfully.~n", [Who, Name]); - {error, Error} -> - ?PRINT("stop tracing ~s ~s error: ~p.~n", [Who, Name, Error]) - end. - -%%-------------------------------------------------------------------- -%% @doc Listeners Command - -listeners([]) -> - foreach(fun({{Protocol, ListenOn}, Pid}) -> - Info = [{acceptors, esockd:get_acceptors(Pid)}, - {max_clients, esockd:get_max_clients(Pid)}, - {current_clients,esockd:get_current_clients(Pid)}, - {shutdown_count, esockd:get_shutdown_count(Pid)}], - ?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), - foreach(fun({Key, Val}) -> - ?PRINT(" ~-16s: ~w~n", [Key, Val]) - end, Info) - end, esockd:listeners()); - -listeners(["restart", Proto, ListenOn]) -> - ListenOn1 = case string:tokens(ListenOn, ":") of - [Port] -> list_to_integer(Port); - [IP, Port] -> {IP, list_to_integer(Port)} - end, - case emqx:restart_listener({list_to_atom(Proto), ListenOn1, []}) of - {ok, _Pid} -> - io:format("Restart ~s listener on ~s successfully.~n", [Proto, ListenOn]); - {error, Error} -> - io:format("Failed to restart ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) - end; - -listeners(["stop", Proto, ListenOn]) -> - ListenOn1 = case string:tokens(ListenOn, ":") of - [Port] -> list_to_integer(Port); - [IP, Port] -> {IP, list_to_integer(Port)} - end, - case emqx:stop_listener({list_to_atom(Proto), ListenOn1, []}) of - ok -> - io:format("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); - {error, Error} -> - io:format("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) - end; - -listeners(_) -> - ?USAGE([{"listeners", "List listeners"}, - {"listeners restart ", "Restart a listener"}, - {"listeners stop ", "Stop a listener"}]). - -%%-------------------------------------------------------------------- -%% Dump ETS -%%-------------------------------------------------------------------- - -dump(Table) -> - dump(Table, ets:first(Table)). - -dump(_Table, '$end_of_table') -> - ok; - -dump(Table, Key) -> - case ets:lookup(Table, Key) of - [Record] -> print(Record); - [] -> ok - end, - dump(Table, ets:next(Table, Key)). - -print([]) -> - ok; - -print(Routes = [#mqtt_route{topic = Topic} | _]) -> - Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], - ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]); - -%% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) -> -%% TopicTable = [io_lib:format("~s:~w", [Topic, Qos]) -%% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], -%% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]); - -%% print(Topics = [#mqtt_topic{}|_]) -> -%% foreach(fun print/1, Topics); - -print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> - ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]); - -print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username, - peername = Peername, connected_at = ConnectedAt}) -> - ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", - [ClientId, CleanSess, Username, emqx_net:format(Peername), - emqx_time:now_secs(ConnectedAt)]); - -%% print(#mqtt_topic{topic = Topic, flags = Flags}) -> -%% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]); -print({route, Routes}) -> - foreach(fun print/1, Routes); -print({local_route, Routes}) -> - foreach(fun print/1, Routes); -print(#mqtt_route{topic = Topic, node = Node}) -> - ?PRINT("~s -> ~s~n", [Topic, Node]); -print({Topic, Node}) -> - ?PRINT("~s -> ~s~n", [Topic, Node]); - -print({ClientId, _ClientPid, _Persistent, SessInfo}) -> - Data = lists:append(SessInfo, emqx_stats:get_session_stats(ClientId)), - InfoKeys = [clean_sess, - subscriptions, - max_inflight, - inflight_len, - mqueue_len, - mqueue_dropped, - awaiting_rel_len, - deliver_msg, - enqueue_msg, - created_at], - ?PRINT("Session(~s, clean_sess=~s, subscriptions=~w, max_inflight=~w, inflight=~w, " - "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, " - "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n", - [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]). - -print(subscription, {Sub, {share, _Share, Topic}}) when is_pid(Sub) -> - ?PRINT("~p -> ~s~n", [Sub, Topic]); -print(subscription, {Sub, Topic}) when is_pid(Sub) -> - ?PRINT("~p -> ~s~n", [Sub, Topic]); -print(subscription, {{SubId, SubPid}, {share, _Share, Topic}}) - when is_binary(SubId), is_pid(SubPid) -> - ?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]); -print(subscription, {{SubId, SubPid}, Topic}) - when is_binary(SubId), is_pid(SubPid) -> - ?PRINT("~s~p -> ~s~n", [SubId, SubPid, Topic]); -print(subscription, {Sub, Topic, Props}) -> - print(subscription, {Sub, Topic}), - lists:foreach(fun({K, V}) when is_binary(V) -> - ?PRINT(" ~-8s: ~s~n", [K, V]); - ({K, V}) -> - ?PRINT(" ~-8s: ~w~n", [K, V]); - (K) -> - ?PRINT(" ~-8s: true~n", [K]) - end, Props). - -format(created_at, Val) -> - emqx_time:now_secs(Val); - -format(_, Val) -> - Val. - -bin(S) -> iolist_to_binary(S). - diff --git a/src/emqx_cli_config.erl b/src/emqx_cli_config.erl deleted file mode 100644 index a2167d2c6..000000000 --- a/src/emqx_cli_config.erl +++ /dev/null @@ -1,363 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module (emqx_cli_config). - --export ([register_config_cli/0, - register_config/0, - run/1, - set_usage/0, - all_cfgs/0, - get_cfg/2, - get_cfg/3, - read_config/1, - write_config/2]). - --define(APP, emqx). - --define(TAB, emqx_config). - -register_config() -> - application:start(clique), - F = fun() -> ekka_mnesia:running_nodes() end, - clique:register_node_finder(F), - register_config_cli(), - create_config_tab(). - -create_config_tab() -> - case ets:info(?TAB, name) of - undefined -> - ets:new(?TAB, [named_table, public]), - {ok, PluginsEtcDir} = emqx:env(plugins_etc_dir), - Files = filelib:wildcard("*.conf", PluginsEtcDir), - lists:foreach(fun(File) -> - [FileName | _] = string:tokens(File, "."), - Configs = cuttlefish_conf:file(lists:concat([PluginsEtcDir, File])), - ets:insert(?TAB, {list_to_atom(FileName), Configs}) - end, Files); - _ -> - ok - end. - -read_config(App) -> - case ets:lookup(?TAB, App) of - [] -> []; - [{_, Value}] -> Value - end. - -write_config(App, Terms) -> - ets:insert(?TAB, {App, Terms}). - -run(Cmd) -> - clique:run(Cmd). - -register_config_cli() -> - ok = clique_config:load_schema([code:priv_dir(?APP)], ?APP), - register_protocol_formatter(), - register_client_formatter(), - register_session_formatter(), - register_queue_formatter(), - register_lager_formatter(), - - register_auth_config(), - register_protocol_config(), - register_connection_config(), - register_client_config(), - register_session_config(), - register_queue_config(), - register_broker_config(), - register_lager_config(). - -set_usage() -> - io:format("~-40s# ~-20s# ~-20s ~p~n", ["key", "value", "datatype", "app"]), - io:format("------------------------------------------------------------------------------------------------~n"), - lists:foreach(fun({Key, Val, Datatype, App}) -> - io:format("~-40s# ~-20s# ~-20s ~p~n", [Key, Val, Datatype, App]) - end, all_cfgs()), - io:format("------------------------------------------------------------------------------------------------~n"), - io:format("Usage: set key=value --app=appname~n"). - -all_cfgs() -> - {Mappings, Mappings1} = lists:foldl( - fun({Key, {_, Map, _}}, {Acc, Acc1}) -> - Map1 = lists:map(fun(M) -> {cuttlefish_mapping:variable(M), Key} end, Map), - {Acc ++ Map, Acc1 ++ Map1} - end, {[], []}, ets:tab2list(clique_schema)), - lists:foldl(fun({Key, _}, Acc) -> - case lists:keyfind(cuttlefish_variable:tokenize(Key), 2, Mappings) of - false -> Acc; - Map -> - Datatype = format_datatype(cuttlefish_mapping:datatype(Map)), - App = proplists:get_value(cuttlefish_variable:tokenize(Key), Mappings1), - [{_, [Val0]}] = clique_config:show([Key], [{app, App}]), - Val = any_to_string(proplists:get_value(Key, Val0)), - [{Key, Val, Datatype, App} | Acc] - end - end, [],lists:sort(ets:tab2list(clique_config))). - -get_cfg(App, Key) -> - get_cfg(App, Key, undefined). - -get_cfg(App, Key, Def) -> - [{_, [Val0]}] = clique_config:show([Key], [{app, App}]), - proplists:get_value(Key, Val0, Def). - -format_datatype(Value) -> - format_datatype(Value, ""). - -format_datatype([Head], Acc) when is_tuple(Head) -> - [Head1 | _] = erlang:tuple_to_list(Head), - lists:concat([Acc, Head1]); -format_datatype([Head], Acc) -> - lists:concat([Acc, Head]); -format_datatype([Head | Tail], Acc) when is_tuple(Head)-> - [Head1 | _] = erlang:tuple_to_list(Head), - format_datatype(Tail, Acc ++ lists:concat([Head1, ", "])); -format_datatype([Head | Tail], Acc) -> - format_datatype(Tail, Acc ++ lists:concat([Head, ", "])). - -%%-------------------------------------------------------------------- -%% Auth/Acl -%%-------------------------------------------------------------------- - -register_auth_config() -> - ConfigKeys = ["mqtt.allow_anonymous", - "mqtt.acl_nomatch", - "mqtt.acl_file", - "mqtt.cache_acl"], - [clique:register_config(Key , fun auth_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -auth_config_callback([_, KeyStr], Value) -> - application:set_env(?APP, l2a(KeyStr), Value), " successfully\n". - -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- - -register_protocol_formatter() -> - ConfigKeys = ["max_clientid_len", - "max_packet_size", - "websocket_protocol_header", - "keepalive_backoff"], - [clique:register_formatter(["mqtt", Key], fun protocol_formatter_callback/2) || Key <- ConfigKeys]. - -protocol_formatter_callback([_, "websocket_protocol_header"], Params) -> - Params; -protocol_formatter_callback([_, Key], Params) -> - proplists:get_value(l2a(Key), Params). - -register_protocol_config() -> - ConfigKeys = ["mqtt.max_clientid_len", - "mqtt.max_packet_size", - "mqtt.websocket_protocol_header", - "mqtt.keepalive_backoff"], - [clique:register_config(Key , fun protocol_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -protocol_config_callback([_AppStr, KeyStr], Value) -> - protocol_config_callback(protocol, l2a(KeyStr), Value). -protocol_config_callback(_App, websocket_protocol_header, Value) -> - application:set_env(?APP, websocket_protocol_header, Value), - " successfully\n"; -protocol_config_callback(App, Key, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), - " successfully\n". - -%%-------------------------------------------------------------------- -%% MQTT Connection -%%-------------------------------------------------------------------- - -register_connection_config() -> - ConfigKeys = ["mqtt.conn.force_gc_count"], - [clique:register_config(Key , fun connection_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -connection_config_callback([_, KeyStr0, KeyStr1], Value) -> - KeyStr = lists:concat([KeyStr0, "_", KeyStr1]), - application:set_env(?APP, l2a(KeyStr), Value), - " successfully\n". - -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- - -register_client_formatter() -> - ConfigKeys = ["max_publish_rate", - "idle_timeout", - "enable_stats"], - [clique:register_formatter(["mqtt", "client", Key], fun client_formatter_callback/2) || Key <- ConfigKeys]. - -client_formatter_callback([_, _, Key], Params) -> - proplists:get_value(list_to_atom(Key), Params). - -register_client_config() -> - ConfigKeys = ["mqtt.client.max_publish_rate", - "mqtt.client.idle_timeout", - "mqtt.client.enable_stats"], - [clique:register_config(Key , fun client_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -client_config_callback([_, AppStr, KeyStr], Value) -> - client_config_callback(l2a(AppStr), l2a(KeyStr), Value). - -client_config_callback(App, idle_timeout, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(client_idle_timeout, 1, Env, {client_idle_timeout, Value})), - " successfully\n"; -client_config_callback(App, enable_stats, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(client_enable_stats, 1, Env, {client_enable_stats, Value})), - " successfully\n"; -client_config_callback(App, Key, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), - " successfully\n". - -%%-------------------------------------------------------------------- -%% session -%%-------------------------------------------------------------------- - -register_session_formatter() -> - ConfigKeys = ["max_subscriptions", - "upgrade_qos", - "max_inflight", - "retry_interval", - "max_awaiting_rel", - "await_rel_timeout", - "enable_stats", - "expiry_interval", - "ignore_loop_deliver"], - [clique:register_formatter(["mqtt", "session", Key], fun session_formatter_callback/2) || Key <- ConfigKeys]. - -session_formatter_callback([_, _, Key], Params) -> - proplists:get_value(list_to_atom(Key), Params). - -register_session_config() -> - ConfigKeys = ["mqtt.session.max_subscriptions", - "mqtt.session.upgrade_qos", - "mqtt.session.max_inflight", - "mqtt.session.retry_interval", - "mqtt.session.max_awaiting_rel", - "mqtt.session.await_rel_timeout", - "mqtt.session.enable_stats", - "mqtt.session.expiry_interval", - "mqtt.session.ignore_loop_deliver"], - [clique:register_config(Key , fun session_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -session_config_callback([_, AppStr, KeyStr], Value) -> - session_config_callback(l2a(AppStr), l2a(KeyStr), Value). -session_config_callback(App, Key, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), - " successfully\n". - -l2a(List) -> list_to_atom(List). - -%%-------------------------------------------------------------------- -%% MQTT MQueue -%%-------------------------------------------------------------------- - -register_queue_formatter() -> - ConfigKeys = ["type", - "priority", - "max_length", - "low_watermark", - "high_watermark", - "store_qos0"], - [clique:register_formatter(["mqtt", "mqueue", Key], fun queue_formatter_callback/2) || Key <- ConfigKeys]. - -queue_formatter_callback([_, _, Key], Params) -> - proplists:get_value(list_to_atom(Key), Params). - -register_queue_config() -> - ConfigKeys = ["mqtt.mqueue.type", - "mqtt.mqueue.priority", - "mqtt.mqueue.max_length", - "mqtt.mqueue.low_watermark", - "mqtt.mqueue.high_watermark", - "mqtt.mqueue.store_qos0"], - [clique:register_config(Key , fun queue_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -queue_config_callback([_, AppStr, KeyStr], Value) -> - queue_config_callback(l2a(AppStr), l2a(KeyStr), Value). - -queue_config_callback(App, low_watermark, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(low_watermark, 1, Env, {low_watermark, Value})), - " successfully\n"; -queue_config_callback(App, high_watermark, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(high_watermark, 1, Env, {high_watermark, Value})), - " successfully\n"; -queue_config_callback(App, Key, Value) -> - {ok, Env} = emqx:env(App), - application:set_env(?APP, App, lists:keyreplace(Key, 1, Env, {Key, Value})), - " successfully\n". - -%%-------------------------------------------------------------------- -%% MQTT Broker -%%-------------------------------------------------------------------- - -register_broker_config() -> - ConfigKeys = ["mqtt.broker.sys_interval"], - [clique:register_config(Key , fun broker_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -broker_config_callback([_, KeyStr0, KeyStr1], Value) -> - KeyStr = lists:concat([KeyStr0, "_", KeyStr1]), - application:set_env(?APP, l2a(KeyStr), Value), - " successfully\n". - -%%-------------------------------------------------------------------- -%% MQTT Lager -%%-------------------------------------------------------------------- - -register_lager_formatter() -> - ConfigKeys = ["level"], - [clique:register_formatter(["log", "console", Key], fun lager_formatter_callback/2) || Key <- ConfigKeys]. - -lager_formatter_callback(_, Params) -> - proplists:get_value(lager_console_backend, Params). - -register_lager_config() -> - ConfigKeys = ["log.console.level"], - [clique:register_config(Key , fun lager_config_callback/2) || Key <- ConfigKeys], - ok = register_config_whitelist(ConfigKeys). - -lager_config_callback(_, Value) -> - lager:set_loglevel(lager_console_backend, Value), - " successfully\n". - -register_config_whitelist(ConfigKeys) -> - clique:register_config_whitelist(ConfigKeys, ?APP). - -%%-------------------------------------------------------------------- -%% Inner Function -%%-------------------------------------------------------------------- -any_to_string(I) when is_integer(I) -> - integer_to_list(I); -any_to_string(F) when is_float(F)-> - float_to_list(F,[{decimals, 4}]); -any_to_string(A) when is_atom(A) -> - atom_to_list(A); -any_to_string(B) when is_binary(B) -> - binary_to_list(B); -any_to_string(L) when is_list(L) -> - L. diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 565c08ff1..8f3ba57d3 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -68,15 +68,6 @@ run([]) -> usage(), ok; run(["help"]) -> usage(), ok; -run(["set"] = CmdS) when length(CmdS) =:= 1 -> - emqx_cli_config:set_usage(), ok; - -run(["set" | _] = CmdS) -> - emqx_cli_config:run(["config" | CmdS]), ok; - -run(["show" | _] = CmdS) -> - emqx_cli_config:run(["config" | CmdS]), ok; - run([CmdS|Args]) -> case lookup(list_to_atom(CmdS)) of [{Mod, Fun}] -> diff --git a/src/emqx_http.erl b/src/emqx_http.erl deleted file mode 100644 index efc24e8fc..000000000 --- a/src/emqx_http.erl +++ /dev/null @@ -1,236 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc HTTP publish API and websocket client. - --module(emqx_http). - --author("Feng Lee "). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --import(proplists, [get_value/2, get_value/3]). - --export([http_handler/0, handle_request/2, http_api/0, inner_handle_request/2]). - --include("emqx_rest.hrl"). - --include("emqx_internal.hrl"). - --record(state, {dispatch}). - -http_handler() -> - APIs = http_api(), - State = #state{dispatch = dispatcher(APIs)}, - {?MODULE, handle_request, [State]}. - -http_api() -> - Attr = emqx_rest_api:module_info(attributes), - [{Regexp, Method, Function, Args} || {http_api, [{Regexp, Method, Function, Args}]} <- Attr]. - -%%-------------------------------------------------------------------- -%% Handle HTTP Request -%%-------------------------------------------------------------------- -handle_request(Req, State) -> - Path = Req:get(path), - case Path of - "/status" -> - handle_request("/status", Req, Req:get(method)); - "/" -> - handle_request("/", Req, Req:get(method)); - "/api/v2/auth" -> %%TODO: Security Issue! - handle_request(Path, Req, State); - _ -> - if_authorized(Req, fun() -> handle_request(Path, Req, State) end) - end. - -inner_handle_request(Req, State) -> - Path = Req:get(path), - case Path of - "/api/v2/auth" -> handle_request(Path, Req, State); - _ -> if_authorized(Req, fun() -> handle_request(Path, Req, State) end) - end. - -handle_request("/api/v2/" ++ Url, Req, #state{dispatch = Dispatch}) -> - Dispatch(Req, Url); - -handle_request("/status", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> - {InternalStatus, _ProvidedStatus} = init:get_status(), - AppStatus = case lists:keysearch(emqx, 1, application:which_applications()) of - false -> not_running; - {value, _Val} -> running - end, - Status = io_lib:format("Node ~s is ~s~nemqx is ~s", - [node(), InternalStatus, AppStatus]), - Req:ok({"text/plain", iolist_to_binary(Status)}); - -handle_request("/", Req, Method) when Method =:= 'HEAD'; Method =:= 'GET' -> - respond(Req, 200, api_list()); - -handle_request(_, Req, #state{}) -> - respond(Req, 404, []). - -dispatcher(APIs) -> - fun(Req, Url) -> - Method = Req:get(method), - case filter(APIs, Url, Method) of - [{Regexp, _Method, Function, FilterArgs}] -> - case params(Req) of - {error, Error1} -> - respond(Req, 200, Error1); - Params -> - case {check_params(Params, FilterArgs), - check_params_type(Params, FilterArgs)} of - {true, true} -> - {match, [MatchList]} = re:run(Url, Regexp, [global, {capture, all_but_first, list}]), - Args = lists:append([[Method, Params], MatchList]), - lager:debug("Mod:~p, Fun:~p, Args:~p", [emqx_rest_api, Function, Args]), - case catch apply(emqx_rest_api, Function, Args) of - {ok, Data} -> - respond(Req, 200, [{code, ?SUCCESS}, {result, Data}]); - {error, Error} -> - respond(Req, 200, Error); - {'EXIT', Reason} -> - lager:error("Execute API '~s' Error: ~p", [Url, Reason]), - respond(Req, 404, []) - end; - {false, _} -> - respond(Req, 200, [{code, ?ERROR7}, {message, <<"params error">>}]); - {_, false} -> - respond(Req, 200, [{code, ?ERROR8}, {message, <<"params type error">>}]) - end - end; - _ -> - lager:error("No match Url:~p", [Url]), - respond(Req, 404, []) - end - end. - -% %%-------------------------------------------------------------------- -% %% Basic Authorization -% %%-------------------------------------------------------------------- -if_authorized(Req, Fun) -> - case authorized(Req) of - true -> Fun(); - false -> respond(Req, 401, []) - end. - -authorized(Req) -> - case Req:get_header_value("Authorization") of - undefined -> - false; - "Basic " ++ BasicAuth -> - {Username, Password} = user_passwd(BasicAuth), - case emq_mgmt:check_user(Username, Password) of - ok -> - true; - {error, Reason} -> - lager:error("HTTP Auth failure: username=~s, reason=~p", [Username, Reason]), - false - end - end. - -user_passwd(BasicAuth) -> - list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). - -respond(Req, 401, Data) -> - Req:respond({401, [{"WWW-Authenticate", "Basic Realm=\"emqx control center\""}], Data}); -respond(Req, 404, Data) -> - Req:respond({404, [{"Content-Type", "text/plain"}], Data}); -respond(Req, 200, Data) -> - Req:respond({200, [{"Content-Type", "application/json"}], to_json(Data)}); -respond(Req, Code, Data) -> - Req:respond({Code, [{"Content-Type", "text/plain"}], Data}). - -filter(APIs, Url, Method) -> - lists:filter(fun({Regexp, Method1, _Function, _Args}) -> - case re:run(Url, Regexp, [global, {capture, all_but_first, list}]) of - {match, _} -> Method =:= Method1; - _ -> false - end - end, APIs). - -params(Req) -> - Method = Req:get(method), - case Method of - 'GET' -> - mochiweb_request:parse_qs(Req); - _ -> - case Req:recv_body() of - <<>> -> []; - undefined -> []; - Body -> - case jsx:is_json(Body) of - true -> jsx:decode(Body); - false -> - lager:error("Body:~p", [Body]), - {error, [{code, ?ERROR9}, {message, <<"Body not json">>}]} - end - end - end. - -check_params(_Params, Args) when Args =:= [] -> - true; -check_params(Params, Args)-> - not lists:any(fun({Item, _Type}) -> undefined =:= proplists:get_value(Item, Params) end, Args). - -check_params_type(_Params, Args) when Args =:= [] -> - true; -check_params_type(Params, Args) -> - not lists:any(fun({Item, Type}) -> - Val = proplists:get_value(Item, Params), - case Type of - int -> not is_integer(Val); - binary -> not is_binary(Val); - bool -> not is_boolean(Val) - end - end, Args). - -to_json([]) -> <<"[]">>; -to_json(Data) -> iolist_to_binary(mochijson2:encode(Data)). - -api_list() -> - [{paths, [<<"api/v2/management/nodes">>, - <<"api/v2/management/nodes/{node_name}">>, - <<"api/v2/monitoring/nodes">>, - <<"api/v2/monitoring/nodes/{node_name}">>, - <<"api/v2/monitoring/listeners">>, - <<"api/v2/monitoring/listeners/{node_name}">>, - <<"api/v2/monitoring/metrics/">>, - <<"api/v2/monitoring/metrics/{node_name}">>, - <<"api/v2/monitoring/stats">>, - <<"api/v2/monitoring/stats/{node_name}">>, - <<"api/v2/nodes/{node_name}/clients">>, - <<"api/v2/nodes/{node_name}/clients/{clientid}">>, - <<"api/v2/clients/{clientid}">>, - <<"api/v2/clients/{clientid}/clean_acl_cache">>, - <<"api/v2/nodes/{node_name}/sessions">>, - <<"api/v2/nodes/{node_name}/sessions/{clientid}">>, - <<"api/v2/sessions/{clientid}">>, - <<"api/v2/nodes/{node_name}/subscriptions">>, - <<"api/v2/nodes/{node_name}/subscriptions/{clientid}">>, - <<"api/v2/subscriptions/{clientid}">>, - <<"api/v2/routes">>, - <<"api/v2/routes/{topic}">>, - <<"api/v2/mqtt/publish">>, - <<"api/v2/mqtt/subscribe">>, - <<"api/v2/mqtt/unsubscribe">>, - <<"api/v2/nodes/{node_name}/plugins">>, - <<"api/v2/nodes/{node_name}/plugins/{plugin_name}">>, - <<"api/v2/configs/{app}">>, - <<"api/v2/nodes/{node_name}/configs/{app}">>]}]. diff --git a/src/emqx_mgmt.erl.bk b/src/emqx_mgmt.erl.bk deleted file mode 100644 index 14345c019..000000000 --- a/src/emqx_mgmt.erl.bk +++ /dev/null @@ -1,544 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mgmt). - --author("Feng Lee "). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include("emqx_internal.hrl"). - --include("emqx_rest.hrl"). - --include_lib("stdlib/include/qlc.hrl"). - --record(mqtt_admin, {username, password, tags}). - --define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). - --import(proplists, [get_value/2]). - --export([brokers/0, broker/1, metrics/0, metrics/1, stats/1, stats/0, - plugins/0, plugins/1, listeners/0, listener/1, nodes_info/0, node_info/1]). - --export([plugin_list/1, plugin_unload/2, plugin_load/2]). - --export([client_list/4, session_list/4, route_list/3, subscription_list/4, alarm_list/0]). - --export([client/1, session/1, route/1, subscription/1]). - --export([query_table/4, lookup_table/3]). - --export([publish/1, subscribe/1, unsubscribe/1]). - --export([kick_client/1, clean_acl_cache/2]). - --export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1, - get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]). - --export([add_user/3, check_user/2, user_list/0, lookup_user/1, - update_user/2, change_password/3, remove_user/1]). - --define(KB, 1024). --define(MB, (1024*1024)). --define(GB, (1024*1024*1024)). - -brokers() -> - [{Node, broker(Node)} || Node <- ekka_mnesia:running_nodes()]. - -broker(Node) when Node =:= node() -> - emqx_broker:info(); -broker(Node) -> - rpc_call(Node, broker, [Node]). - -metrics() -> - [{Node, metrics(Node)} || Node <- ekka_mnesia:running_nodes()]. - -metrics(Node) when Node =:= node() -> - emqx_metrics:all(); -metrics(Node) -> - rpc_call(Node, metrics, [Node]). - -stats() -> - [{Node, stats(Node)} || Node <- ekka_mnesia:running_nodes()]. - -stats(Node) when Node =:= node() -> - emqx_stats:getstats(); -stats(Node) -> - rpc_call(Node, stats, [Node]). - -plugins() -> - [{Node, plugins(Node)} || Node <- ekka_mnesia:running_nodes()]. - -plugins(Node) when Node =:= node() -> - emqx_plugins:list(Node); -plugins(Node) -> - rpc_call(Node, plugins, [Node]). - -listeners() -> - [{Node, listener(Node)} || Node <- ekka_mnesia:running_nodes()]. - -listener(Node) when Node =:= node() -> - lists:map(fun({{Protocol, ListenOn}, Pid}) -> - Info = [{acceptors, esockd:get_acceptors(Pid)}, - {max_clients, esockd:get_max_clients(Pid)}, - {current_clients,esockd:get_current_clients(Pid)}, - {shutdown_count, esockd:get_shutdown_count(Pid)}], - {Protocol, ListenOn, Info} - end, esockd:listeners()); - -listener(Node) -> - rpc_call(Node, listener, [Node]). - -nodes_info() -> - Running = mnesia:system_info(running_db_nodes), - Stopped = mnesia:system_info(db_nodes) -- Running, - DownNodes = lists:map(fun stop_node/1, Stopped), - [node_info(Node) || Node <- Running] ++ DownNodes. - -node_info(Node) when Node =:= node() -> - CpuInfo = [{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()], - Memory = emqx_vm:get_memory(), - OtpRel = "R" ++ erlang:system_info(otp_release) ++ "/" ++ erlang:system_info(version), - [{name, node()}, - {otp_release, list_to_binary(OtpRel)}, - {memory_total, kmg(get_value(allocated, Memory))}, - {memory_used, kmg(get_value(used, Memory))}, - {process_available, erlang:system_info(process_limit)}, - {process_used, erlang:system_info(process_count)}, - {max_fds, get_value(max_fds, erlang:system_info(check_io))}, - {clients, ets:info(mqtt_client, size)}, - {node_status, 'Running'} | CpuInfo]; - -node_info(Node) -> - rpc_call(Node, node_info, [Node]). - -stop_node(Node) -> - [{name, Node}, {node_status, 'Stopped'}]. -%%-------------------------------------------------------- -%% plugins -%%-------------------------------------------------------- -plugin_list(Node) when Node =:= node() -> - emqx_plugins:list(); -plugin_list(Node) -> - rpc_call(Node, plugin_list, [Node]). - -plugin_load(Node, PluginName) when Node =:= node() -> - emqx_plugins:load(PluginName); -plugin_load(Node, PluginName) -> - rpc_call(Node, plugin_load, [Node, PluginName]). - -plugin_unload(Node, PluginName) when Node =:= node() -> - emqx_plugins:unload(PluginName); -plugin_unload(Node, PluginName) -> - rpc_call(Node, plugin_unload, [Node, PluginName]). - -%%-------------------------------------------------------- -%% client -%%-------------------------------------------------------- -client_list(Node, Key, PageNo, PageSize) when Node =:= node() -> - client_list(Key, PageNo, PageSize); -client_list(Node, Key, PageNo, PageSize) -> - rpc_call(Node, client_list, [Node, Key, PageNo, PageSize]). - -client(ClientId) -> - lists:flatten([client_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). - -%%-------------------------------------------------------- -%% session -%%-------------------------------------------------------- -session_list(Node, Key, PageNo, PageSize) when Node =:= node() -> - session_list(Key, PageNo, PageSize); -session_list(Node, Key, PageNo, PageSize) -> - rpc_call(Node, session_list, [Node, Key, PageNo, PageSize]). - -session(ClientId) -> - lists:flatten([session_list(Node, ClientId, 1, 20) || Node <- ekka_mnesia:running_nodes()]). - -%%-------------------------------------------------------- -%% subscription -%%-------------------------------------------------------- -subscription_list(Node, Key, PageNo, PageSize) when Node =:= node() -> - subscription_list(Key, PageNo, PageSize); -subscription_list(Node, Key, PageNo, PageSize) -> - rpc_call(Node, subscription_list, [Node, Key, PageNo, PageSize]). - -subscription(Key) -> - lists:flatten([subscription_list(Node, Key, 1, 20) || Node <- ekka_mnesia:running_nodes()]). - -%%-------------------------------------------------------- -%% Routes -%%-------------------------------------------------------- -route(Key) -> route_list(Key, 1, 20). - -%%-------------------------------------------------------- -%% alarm -%%-------------------------------------------------------- -alarm_list() -> - emqx_alarm:get_alarms(). - -query_table(Qh, PageNo, PageSize, TotalNum) -> - Cursor = qlc:cursor(Qh), - case PageNo > 1 of - true -> qlc:next_answers(Cursor, (PageNo - 1) * PageSize); - false -> ok - end, - Rows = qlc:next_answers(Cursor, PageSize), - qlc:delete_cursor(Cursor), - [{totalNum, TotalNum}, - {totalPage, total_page(TotalNum, PageSize)}, - {result, Rows}]. - -total_page(TotalNum, PageSize) -> - case TotalNum rem PageSize of - 0 -> TotalNum div PageSize; - _ -> (TotalNum div PageSize) + 1 - end. - -%%TODO: refactor later... -lookup_table(LookupFun, _PageNo, _PageSize) -> - Rows = LookupFun(), - Rows. - -%%-------------------------------------------------------------------- -%% mqtt -%%-------------------------------------------------------------------- -publish({ClientId, Topic, Payload, Qos, Retain}) -> - case validate(topic, Topic) of - true -> - Msg = emqx_message:make(ClientId, Qos, Topic, Payload), - emqx:publish(Msg#mqtt_message{retain = Retain}), - ok; - false -> - {error, format_error(Topic, "validate topic: ${0} fail")} - end. - -subscribe({ClientId, Topic, Qos}) -> - case validate(topic, Topic) of - true -> - case emqx_sm:lookup_session(ClientId) of - undefined -> - {error, format_error(ClientId, "Clientid: ${0} not found")}; - #mqtt_session{sess_pid = SessPid} -> - emqx_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), - ok - end; - false -> - {error, format_error(Topic, "validate topic: ${0} fail")} - end. - -unsubscribe({ClientId, Topic}) -> - case validate(topic, Topic) of - true -> - case emqx_sm:lookup_session(ClientId) of - undefined -> - {error, format_error(ClientId, "Clientid: ${0} not found")}; - #mqtt_session{sess_pid = SessPid} -> - emqx_session:unsubscribe(SessPid, [{Topic, []}]), - ok - end; - false -> - {error, format_error(Topic, "validate topic: ${0} fail")} - end. - -% publish(Messages) -> -% lists:foldl( -% fun({ClientId, Topic, Payload, Qos, Retain}, {Success, Failed}) -> -% case validate(topic, Topic) of -% true -> -% Msg = emqx_message:make(ClientId, Qos, Topic, Payload), -% emqx:publish(Msg#mqtt_message{retain = Retain}), -% {[[{topic, Topic}]| Success], Failed}; -% false -> -% {Success, [[{topic, Topic}]| Failed]} -% end -% end, {[], []}, Messages). - -% subscribers(Subscribers) -> -% lists:foldl( -% fun({ClientId, Topic, Qos}, {Success, Failed}) -> -% case emqx_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqx_session:subscribe(SessPid, [{Topic, [{qos, Qos}]}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end,{[], []}, Subscribers). - -% unsubscribers(UnSubscribers)-> -% lists:foldl( -% fun({ClientId, Topic}, {Success, Failed}) -> -% case emqx_sm:lookup_session(ClientId) of -% undefined -> -% {Success, [[{client_id, ClientId}]|Failed]}; -% #mqtt_session{sess_pid = SessPid} -> -% emqx_session:unsubscriber(SessPid, [{Topic, []}]), -% {[[{client_id, ClientId}]| Success], Failed} -% end -% end, {[], []}, UnSubscribers). - -%%-------------------------------------------------------------------- -%% manager API -%%-------------------------------------------------------------------- -kick_client(ClientId) -> - Result = [kick_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()], - lists:any(fun(Item) -> Item =:= ok end, Result). - -kick_client(Node, ClientId) when Node =:= node() -> - case emqx_cm:lookup(ClientId) of - undefined -> error; - #mqtt_client{client_pid = Pid}-> emqx_client:kick(Pid) - end; -kick_client(Node, ClientId) -> - rpc_call(Node, kick_client, [Node, ClientId]). - - -clean_acl_cache(ClientId, Topic) -> - Result = [clean_acl_cache(Node, ClientId, Topic) || Node <- ekka_mnesia:running_nodes()], - lists:any(fun(Item) -> Item =:= ok end, Result). - -clean_acl_cache(Node, ClientId, Topic) when Node =:= node() -> - case emqx_cm:lookup(ClientId) of - undefined -> error; - #mqtt_client{client_pid = Pid}-> emqx_client:clean_acl_cache(Pid, Topic) - end; -clean_acl_cache(Node, ClientId, Topic) -> - rpc_call(Node, clean_acl_cache, [Node, ClientId, Topic]). - -%%-------------------------------------------------------------------- -%% Config ENV -%%-------------------------------------------------------------------- -modify_config(App, Terms) -> - emqx_config:write(App, Terms). - -modify_config(App, Key, Value) -> - Result = [modify_config(Node, App, Key, Value) || Node <- ekka_mnesia:running_nodes()], - lists:any(fun(Item) -> Item =:= ok end, Result). - -modify_config(Node, App, Key, Value) when Node =:= node() -> - emqx_config:set(App, Key, Value); -modify_config(Node, App, Key, Value) -> - rpc_call(Node, modify_config, [Node, App, Key, Value]). - -get_configs() -> - [{Node, get_config(Node)} || Node <- ekka_mnesia:running_nodes()]. - -get_config(Node) when Node =:= node()-> - emqx_cli_config:all_cfgs(); -get_config(Node) -> - rpc_call(Node, get_config, [Node]). - -get_plugin_config(PluginName) -> - emqx_config:read(PluginName). -get_plugin_config(Node, PluginName) -> - rpc_call(Node, get_plugin_config, [PluginName]). - -modify_plugin_config(PluginName, Terms) -> - emqx_config:write(PluginName, Terms). -modify_plugin_config(Node, PluginName, Terms) -> - rpc_call(Node, modify_plugin_config, [PluginName, Terms]). - -%%-------------------------------------------------------------------- -%% manager user API -%%-------------------------------------------------------------------- -check_user(undefined, _) -> - {error, "Username undefined"}; -check_user(_, undefined) -> - {error, "Password undefined"}; -check_user(Username, Password) -> - case mnesia:dirty_read(mqtt_admin, Username) of - [#mqtt_admin{password = <>}] -> - case Hash =:= md5_hash(Salt, Password) of - true -> ok; - false -> {error, "Password error"} - end; - [] -> - {error, "User not found"} - end. - -add_user(Username, Password, Tag) -> - Admin = #mqtt_admin{username = Username, - password = hash(Password), - tags = Tag}, - return(mnesia:transaction(fun add_user_/1, [Admin])). - -add_user_(Admin = #mqtt_admin{username = Username}) -> - case mnesia:wread({mqtt_admin, Username}) of - [] -> mnesia:write(Admin); - [_] -> {error, [{code, ?ERROR13}, {message, <<"User already exist">>}]} - end. - -user_list() -> - [row(Admin) || Admin <- ets:tab2list(mqtt_admin)]. - -lookup_user(Username) -> - Admin = mnesia:dirty_read(mqtt_admin, Username), - row(Admin). - -update_user(Username, Params) -> - case mnesia:dirty_read({mqtt_admin, Username}) of - [] -> - {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; - [User] -> - Admin = case proplists:get_value(<<"tags">>, Params) of - undefined -> User; - Tag -> User#mqtt_admin{tags = Tag} - end, - return(mnesia:transaction(fun() -> mnesia:write(Admin) end)) - end. - -remove_user(Username) -> - Trans = fun() -> - case lookup_user(Username) of - [] -> {error, [{code, ?ERROR5}, {message, <<"User not found">>}]}; - _ -> mnesia:delete({mqtt_admin, Username}) - end - end, - return(mnesia:transaction(Trans)). - -change_password(Username, OldPwd, NewPwd) -> - Trans = fun() -> - case mnesia:wread({mqtt_admin, Username}) of - [Admin = #mqtt_admin{password = <>}] -> - case Hash =:= md5_hash(Salt, OldPwd) of - true -> - mnesia:write(Admin#mqtt_admin{password = hash(NewPwd)}); - false -> - {error, [{code, ?ERROR14}, {message, <<"OldPassword error">>}]} - end; - [] -> - {error, [{code, ?ERROR5}, {message, <<"User not found">>}]} - end - end, - return(mnesia:transaction(Trans)). - -return({atomic, ok}) -> - ok; -return({atomic, Error}) -> - Error; -return({aborted, Reason}) -> - lager:error("Mnesia Transaction error:~p~n", [Reason]), - error. - -row(#mqtt_admin{username = Username, tags = Tags}) -> - [{username, Username}, {tags, Tags}]; -row([#mqtt_admin{username = Username, tags = Tags}]) -> - [{username, Username}, {tags, Tags}]; -row([]) ->[]. -%%-------------------------------------------------------------------- -%% Internel Functions. -%%-------------------------------------------------------------------- - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - -kmg(Byte) when Byte > ?GB -> - float(Byte / ?GB, "G"); -kmg(Byte) when Byte > ?MB -> - float(Byte / ?MB, "M"); -kmg(Byte) when Byte > ?KB -> - float(Byte / ?MB, "K"); -kmg(Byte) -> - Byte. -float(F, S) -> - iolist_to_binary(io_lib:format("~.2f~s", [F, S])). - -validate(qos, Qos) -> - (Qos >= ?QOS_0) and (Qos =< ?QOS_2); - -validate(topic, Topic) -> - emqx_topic:validate({name, Topic}). - -client_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> - TotalNum = ets:info(mqtt_client, size), - Qh = qlc:q([R || R <- ets:table(mqtt_client)]), - query_table(Qh, PageNo, PageSize, TotalNum); - -client_list(ClientId, PageNo, PageSize) -> - Fun = fun() -> ets:lookup(mqtt_client, ClientId) end, - lookup_table(Fun, PageNo, PageSize). - -session_list(ClientId, PageNo, PageSize) when ?EMPTY_KEY(ClientId) -> - TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_session]]), - Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_session]]), - query_table(Qh, PageNo, PageSize, TotalNum); - -session_list(ClientId, PageNo, PageSize) -> - MP = {ClientId, '_', '_', '_'}, - Fun = fun() -> lists:append([ets:match_object(Tab, MP) || Tab <- [mqtt_local_session]]) end, - lookup_table(Fun, PageNo, PageSize). - -subscription_list(Key, PageNo, PageSize) when ?EMPTY_KEY(Key) -> - TotalNum = ets:info(mqtt_subproperty, size), - Qh = qlc:q([E || E <- ets:table(mqtt_subproperty)]), - query_table(Qh, PageNo, PageSize, TotalNum); - -subscription_list(Key, PageNo, PageSize) -> - Fun = fun() -> ets:match_object(mqtt_subproperty, {{'_', {Key, '_'}}, '_'}) end, - lookup_table(Fun, PageNo, PageSize). - -route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) -> - Tables = [mqtt_route], - TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_route, mqtt_local_route]]), - Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- Tables]), - Data = query_table(Qh, PageNo, PageSize, TotalNum), - Route = get_value(result, Data), - LocalRoute = local_route_list(Topic, PageNo, PageSize), - lists:keyreplace(result, 1, Data, {result, lists:append(Route, LocalRoute)}); - -route_list(Topic, PageNo, PageSize) -> - Tables = [mqtt_route], - Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- Tables]) end, - Route = lookup_table(Fun, PageNo, PageSize), - LocalRoute = local_route_list(Topic, PageNo, PageSize), - lists:append(Route, LocalRoute). - -local_route_list(Topic, PageNo, PageSize) when ?EMPTY_KEY(Topic) -> - TotalNum = lists:sum([ets:info(Tab, size) || Tab <- [mqtt_local_route]]), - Qh = qlc:append([qlc:q([E || E <- ets:table(Tab)]) || Tab <- [mqtt_local_route]]), - Data = query_table(Qh, PageNo, PageSize, TotalNum), - lists:map(fun({Topic1, Node}) -> {<<"$local/", Topic1/binary>>, Node} end, get_value(result, Data)); - -local_route_list(Topic, PageNo, PageSize) -> - Fun = fun() -> lists:append([ets:lookup(Tab, Topic) || Tab <- [mqtt_local_route]]) end, - Data = lookup_table(Fun, PageNo, PageSize), - lists:map(fun({Topic1, Node}) -> {<<"$local/", Topic1/binary>>, Node} end, Data). - - -format_error(Val, Msg) -> - re:replace(Msg, <<"\\$\\{[^}]+\\}">>, Val, [global, {return, binary}]). - -hash(Password) -> - SaltBin = salt(), - <>. - -md5_hash(SaltBin, Password) -> - erlang:md5(<>). - -salt() -> - seed(), - Salt = rand:uniform(16#ffffffff), - <>. - -seed() -> - rand:seed(exsplus, erlang:timestamp()). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index e50d7355a..093fc02f7 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -204,15 +204,16 @@ process(?CONNECT_PACKET(Var), State0) -> client_id = ClientId, is_bridge = IsBridge} = Var, - State1 = State0#proto_state{proto_ver = ProtoVer, - proto_name = ProtoName, - username = Username, - client_id = ClientId, - clean_sess = CleanSess, - keepalive = KeepAlive, - will_msg = willmsg(Var, State0), - is_bridge = IsBridge, - connected_at = os:timestamp()}, + State1 = repl_username_with_peercert( + State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + client_id = ClientId, + clean_sess = CleanSess, + keepalive = KeepAlive, + will_msg = willmsg(Var, State0), + is_bridge = IsBridge, + connected_at = os:timestamp()}), {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of @@ -407,6 +408,7 @@ shutdown(mnesia_conflict, _State) -> %% let it down %% emqx_cm:unreg(ClientId); ignore; + shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], State), Client = client(State), diff --git a/src/emqx_router.erl b/src/emqx_router.erl index c376325aa..4ab3b49b2 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -34,7 +34,10 @@ -export([start/0, stop/0]). %% Route APIs --export([add_route/1, del_route/1, match/1, print/1, has_route/1]). +-export([add_route/1, get_routes/1, del_route/1, has_route/1]). + +%% Match and print +-export([match/1, print/1]). %% Local Route API -export([get_local_routes/0, add_local_route/1, match_local/1, @@ -130,6 +133,11 @@ add_trie_route(Route = #mqtt_route{topic = Topic}) -> end, mnesia:write(Route). +%% @doc Lookup Routes +-spec(get_routes(binary()) -> [mqtt_route()]). +get_routes(Topic) -> + ets:lookup(mqtt_route, Topic). + %% @doc Delete Route -spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: term()}). del_route(Topic) when is_binary(Topic) -> @@ -284,7 +292,5 @@ clean_routes_(Node) -> mnesia:transaction(Clean). update_stats_() -> - Size = mnesia:table_info(mqtt_route, size), - emqx_stats:setstats('routes/count', 'routes/max', Size), - emqx_stats:setstats('topics/count', 'topics/max', Size). + emqx_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)). diff --git a/test/emqttd_cli_SUITE.erl b/test/emqttd_cli_SUITE.erl deleted file mode 100644 index 273518b7f..000000000 --- a/test/emqttd_cli_SUITE.erl +++ /dev/null @@ -1,52 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_cli_SUITE). - --compile(export_all). - --include("emqttd.hrl"). - --include_lib("eunit/include/eunit.hrl"). - -all() -> - [{group, subscriptions}]. - -groups() -> - [{subscriptions, [sequence], - [t_subsciptions_list, - t_subsciptions_show, - t_subsciptions_add, - t_subsciptions_del]}]. - -init_per_suite(Config) -> - Config. - -end_per_suite(_Config) -> - todo. - -t_subsciptions_list(_) -> - todo. - -t_subsciptions_show(_) -> - todo. - -t_subsciptions_add(_) -> - todo. - -t_subsciptions_del(_) -> - todo. - diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 2bd961f30..472069464 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -68,7 +68,6 @@ all() -> {group, http}, {group, alarms}, {group, cli}, - {group, rest_api}, {group, cleanSession}]. groups() -> @@ -121,9 +120,7 @@ groups() -> {cleanSession, [sequence], [cleanSession_validate, cleanSession_validate1] - }, - {rest_api, [sequence], - [get_api_lists]} + } ]. init_per_suite(Config) -> @@ -413,14 +410,59 @@ request_publish(_) -> {client_id, <<"random">>}, {clean_sess, false}]), SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("admin", "public"))), - ok = emqttd:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), + ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("admin", "public"))), + ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}", - ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("admin", "public"))), + ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("admin", "public"))), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqttd_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("admin", "public"))). + ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("admin", "public"))). + +connect_emqx_pubsub_(Method, Api, Params, Auth) -> + Url = "http://127.0.0.1:8080/" ++ Api, + case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of + {error, socket_closed_remotely} -> + false; + {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> + true; + {ok, {{"HTTP/1.1", 400, _}, _, []}} -> + false; + {ok, {{"HTTP/1.1", 404, _}, _, []}} -> + false + end. + +request(Path) -> + http_get(get, Path). + +http_get(Method, Path) -> + req(Method, Path, []). + +http_put(Method, Path, Params) -> + req(Method, Path, format_for_upload(Params)). + +http_post(Method, Path, Params) -> + req(Method, Path, format_for_upload(Params)). + +req(Method, Path, Body) -> + Url = ?URL ++ Path, + Headers = auth_header_("", ""), + case httpc:request(Method, {Url, [Headers]}, [], []) of + {error, R} -> + ct:log("R:~p~n", [R]), + false; + {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> + true; + {ok, {{"HTTP/1.1", 400, _}, _, []}} -> + false; + {ok, {{"HTTP/1.1", 404, _}, _, []}} -> + false + end. + +format_for_upload(none) -> + <<"">>; +format_for_upload(List) -> + iolist_to_binary(mochijson2:encode(List)). connect_emqx_publish_(Method, Api, Params, Auth) -> Url = "http://127.0.0.1:8080/" ++ Api, @@ -615,69 +657,6 @@ cleanSession_validate1(_) -> emqttc:disconnect(Pub), emqttc:disconnect(C11). -get_api_lists(_Config) -> - lists:foreach(fun request/1, ?GET_API). - -request_publish(_) -> - emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"random">>}, - {clean_sess, false}]), - SubParams = "{\"qos\":1, \"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/subscribe", SubParams, auth_header_("", ""))), - ok = emqx:subscribe(<<"a/b/c">>, self(), [{qos, 1}]), - Params = "{\"qos\":1, \"retain\":false, \"topic\" : \"a\/b\/c\", \"messages\" :\"hello\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/publish", Params, auth_header_("", ""))), - ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), - - UnSubParams = "{\"topic\" : \"a\/b\/c\", \"client_id\" :\"random\"}", - ?assert(connect_emqx_pubsub_(post, "api/v2/mqtt/unsubscribe", UnSubParams, auth_header_("", ""))). - -connect_emqx_pubsub_(Method, Api, Params, Auth) -> - Url = "http://127.0.0.1:8080/" ++ Api, - case httpc:request(Method, {Url, [Auth], ?CONTENT_TYPE, Params}, [], []) of - {error, socket_closed_remotely} -> - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -request(Path) -> - http_get(get, Path). - -http_get(Method, Path) -> - req(Method, Path, []). - -http_put(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -http_post(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -req(Method, Path, Body) -> - Url = ?URL ++ Path, - Headers = auth_header_("", ""), - case httpc:request(Method, {Url, [Headers]}, [], []) of - {error, R} -> - ct:log("R:~p~n", [R]), - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -format_for_upload(none) -> - <<"">>; -format_for_upload(List) -> - iolist_to_binary(mochijson2:encode(List)). - ensure_ok(ok) -> ok; ensure_ok({error, {already_started, _}}) -> ok. @@ -766,34 +745,3 @@ set_app_env({App, Lists}) -> application:set_env(App, Par, Var) end, Lists). -request(Path) -> - http_get(get, Path). - -http_get(Method, Path) -> - req(Method, Path, []). - -http_put(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -http_post(Method, Path, Params) -> - req(Method, Path, format_for_upload(Params)). - -req(Method, Path, Body) -> - Url = ?URL ++ Path, - Headers = auth_header_("admin", "public"), - case httpc:request(Method, {Url, [Headers]}, [], []) of - {error, socket_closed_remotely} -> - false; - {ok, {{"HTTP/1.1", 200, "OK"}, _, _Return} } -> - true; - {ok, {{"HTTP/1.1", 400, _}, _, []}} -> - false; - {ok, {{"HTTP/1.1", 404, _}, _, []}} -> - false - end. - -format_for_upload(none) -> - <<"">>; -format_for_upload(List) -> - iolist_to_binary(mochijson2:encode(List)). - diff --git a/test/emqx_config_SUITE.erl b/test/emqx_config_SUITE.erl deleted file mode 100644 index 04c957b75..000000000 --- a/test/emqx_config_SUITE.erl +++ /dev/null @@ -1,149 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_config_SUITE). - --compile(export_all). - --include("emqttd.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --include_lib("common_test/include/ct.hrl"). - -all() -> - [{group, emq_config}]. - -groups() -> - [{emq_config, [sequence], - [run_protocol_cmd, - run_client_cmd, - run_session_cmd, - run_queue_cmd, - run_auth_cmd, - run_lager_cmd, - run_connection_cmd, - run_broker_config] - }]. - -init_per_suite(Config) -> - Config. - -end_per_suite(Config) -> - Config. - -run_protocol_cmd(_Config) -> - SetConfigKeys = [{"max_clientid_len=2048", int}, - {"max_packet_size=1024", int}, - % {"websocket_protocol_header=off", atom}, - {"keepalive_backoff=0.5", float}], - lists:foreach(fun set_cmd/1, SetConfigKeys), - R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), - {ok, E} = application:get_env(emqttd, protocol), - ?assertEqual(R, lists:sort(E)), - emqttd_cli_config:run(["config", "set", "mqtt.websocket_protocol_header=off", "--app=emqttd"]), - {ok, E1} = application:get_env(emqttd, websocket_protocol_header), - ?assertEqual(false, E1). - -run_client_cmd(_Config) -> - SetConfigKeys = [{"max_publish_rate=100", int}, - {"idle_timeout=60s", date}, - {"enable_stats=on", atom}], - lists:foreach(fun(Key) -> set_cmd("client", Key) end, SetConfigKeys), - R = lists:sort(lists:map(fun(Key) -> env_value("client", Key) end, SetConfigKeys)), - {ok, E} = application:get_env(emqttd, client), - ?assertEqual(R, lists:sort(E)). - -run_session_cmd(_Config) -> - SetConfigKeys = [{"max_subscriptions=5", int}, - {"upgrade_qos=on", atom}, - {"max_inflight=64", int}, - {"retry_interval=60s", date}, - {"max_awaiting_rel=200", int}, - {"await_rel_timeout=60s",date}, - {"enable_stats=on", atom}, - {"expiry_interval=60s", date}, - {"ignore_loop_deliver=true", atom}], - lists:foreach(fun(Key) -> set_cmd("session", Key) end, SetConfigKeys), - R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), - {ok, E} = application:get_env(emqttd, session), - ?assertEqual(R, lists:sort(E)). - -run_queue_cmd(_Config) -> - SetConfigKeys = [{"type=priority", atom}, - {"priority=hah", string}, - {"max_length=2000", int}, - {"low_watermark=40%",percent}, - {"high_watermark=80%", percent}, - {"store_qos0=false", atom}], - lists:foreach(fun(Key) -> set_cmd("mqueue", Key) end, SetConfigKeys), - R = lists:sort(lists:map(fun env_value/1, SetConfigKeys)), - {ok, E} = application:get_env(emqttd, mqueue), - ?assertEqual(R, lists:sort(E)). - -run_auth_cmd(_Config) -> - SetConfigKeys = [{"allow_anonymous=true", atom}, - {"acl_nomatch=deny", atom}, - {"acl_file=etc/test.acl", string}, - {"cache_acl=false", atom}], - lists:foreach(fun set_cmd/1, SetConfigKeys), - {ok, true} = application:get_env(emqttd, allow_anonymous), - {ok, deny} = application:get_env(emqttd, acl_nomatch), - {ok, "etc/test.acl"} = application:get_env(emqttd, acl_file), - {ok, false} = application:get_env(emqttd, cache_acl). - -run_lager_cmd(_Config) -> - emqttd_cli_config:run(["config", "set", "log.console.level=info", "--app=emqttd"]), - ok. - -run_connection_cmd(_Config) -> - emqttd_cli_config:run(["config", "set", "mqtt.conn.force_gc_count=1000", "--app=emqttd"]), - {ok, E} = application:get_env(emqttd, conn_force_gc_count), - ?assertEqual(1000, E). - -run_broker_config(_Config) -> - emqttd_cli_config:run(["config", "set", "mqtt.broker.sys_interval=10", "--app=emqttd"]), - {ok, E} = application:get_env(emqttd, broker_sys_interval), - ?assertEqual(10, E). - -env_value("client", {Key, Type}) -> - case string:split(Key, "=") of - ["max_publish_rate", V] -> - {list_to_atom("max_publish_rate"), format(Type, V)}; - [K, V] -> - {list_to_atom(string:join(["client", K], "_")), format(Type, V)} - end. - -env_value({Key, Type}) -> - [K, V] = string:split(Key, "="), - {list_to_atom(K), format(Type, V)}. - -format(string, S) -> S; -format(atom, "on") -> true; -format(atom, "off") -> false; -format(atom, A) -> list_to_atom(A); -format(float, F) -> list_to_float(F); -format(percent, P) -> - {match, [N]} = re:run(P, "^([0-9]+)%$", [{capture, all_but_first, list}]), - list_to_integer(N) / 100; -format(int, I) -> list_to_integer(I); -format(date, _I) -> 60000. - -set_cmd({Key, _Type}) -> - emqttd_cli_config:run(["config", "set", string:join(["mqtt", Key], "."), "--app=emqttd"]). - -set_cmd(Pre, {Key, _Type}) -> - emqttd_cli_config:run(["config", "set", string:join(["mqtt", Pre, Key], "."), "--app=emqttd"]). diff --git a/test/emqx_mod_SUITE.erl b/test/emqx_mod_SUITE.erl index 1fcf455d0..846b14467 100644 --- a/test/emqx_mod_SUITE.erl +++ b/test/emqx_mod_SUITE.erl @@ -14,11 +14,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqttd_mod_SUITE). +-module(emqx_mod_SUITE). -compile(export_all). --include("emqttd.hrl"). +-include("emqx.hrl"). all() -> [mod_subscription_rep]. diff --git a/test/emqttd_router_SUITE.erl b/test/emqx_router_SUITE.erl similarity index 93% rename from test/emqttd_router_SUITE.erl rename to test/emqx_router_SUITE.erl index 415550ec3..51db6c506 100644 --- a/test/emqttd_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -14,15 +14,15 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqttd_router_SUITE). +-module(emqx_router_SUITE). -compile(export_all). --include("emqttd.hrl"). +-include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(R, emqttd_router). +-define(R, emqx_router). all() -> [{group, route}, @@ -44,11 +44,11 @@ groups() -> init_per_suite(Config) -> ekka:start(), ekka_mnesia:ensure_started(), - {ok, _R} = emqttd_router:start(), + {ok, _R} = emqx_router:start(), Config. end_per_suite(_Config) -> - emqttd_router:stop(), + emqx_router:stop(), ekka:stop(), ekka_mnesia:ensure_stopped(), ekka_mnesia:delete_schema(). @@ -148,7 +148,7 @@ router_add_del(_) -> %% Del ?R:del_route(<<"a/b/c">>), [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)), - {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), + {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, @@ -169,6 +169,6 @@ t_print(_) -> ?R:del_route(<<"#">>). router_unused(_) -> - gen_server:call(emqttd_router, bad_call), - gen_server:cast(emqttd_router, bad_msg), - emqttd_router ! bad_info. + gen_server:call(emqx_router, bad_call), + gen_server:cast(emqx_router, bad_msg), + emqx_router ! bad_info.