%% CouchDB %% Copyright (C) 2006 Damien Katz %% %% This program is free software; you can redistribute it and/or %% modify it under the terms of the GNU General Public License %% as published by the Free Software Foundation; either version 2 %% of the License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program; if not, write to the Free Software %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -module(couch_stew_group). -behaviour(gen_server). -export([start_manager/4,stop/1, open/4,open/5,fold/5,fold/6, update_group/3,update_view_proc/5,free_groups/2, update_view_proc/7,get_group_async/3, less_json/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). -include("couch_db.hrl"). % arbitrarily chosen amount of memory to use before flushing to disk -define(FLUSH_MAX_MEM, 4000000). -record(view_group, {db, query_lang, named_queries, views, current_seq, id_btree, update_notif_fun, compiled_doc_map = nil }). -record(view, {id_num, name, btree, columns, query_string }). -record(mgr, {db, fd, update_state_by_id = dict:new(), ids_by_pid = dict:new(), get_group_info_fun, update_group_info_fun, cached_groups=dict:new() }). start_manager(Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun) -> gen_server:start_link(couch_stew_group, {Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}, []). stop(MgrPid) -> gen_server:cast(MgrPid, stop). init({Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}) -> process_flag(trap_exit, true), {ok, #mgr{db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun}}. terminate(_Reason, _Mgr) -> ok. get_group_async(MgrPid, GroupId, OrigFrom) -> gen_server:cast(MgrPid, {get_group_async, GroupId, OrigFrom}). update_group(MgrPid, GroupId, UpdateNotifFun) -> gen_server:cast(MgrPid, {update_group, GroupId, UpdateNotifFun}). % stops any processing of the view and frees and cached values free_groups(MgrPid, GroupIds) -> gen_server:call(MgrPid, {free_groups, GroupIds}). % called from the update process handle_call({group_cache_update, GroupId, Group}, {FromPid, _FromRef}, Mgr) -> Group2 = Group#view_group{compiled_doc_map=nil}, % the process may have been killed by the free groups call % so check to make sure its alive. case is_process_alive(FromPid) of true -> #mgr{cached_groups=CachedGroups} = Mgr, CachedGroups2 = dict:store(GroupId, Group2, CachedGroups), {reply, ok, Mgr#mgr{cached_groups=CachedGroups2}}; false -> {reply, ok, Mgr} end; handle_call({free_groups, GroupIds}, _From, Mgr) -> Mgr2 = lists:foldl(fun(GroupId, MgrAcc) -> #mgr{cached_groups=CachedGroups, update_state_by_id=ProcsDict} = MgrAcc, CachedGroups2 = dict:erase(GroupId, CachedGroups), case dict:find(GroupId, ProcsDict) of {ok, {processing_request, _NotifyFuns, Pid}} -> exit(Pid, freed); {ok, {processing_and_pending_request, _NotifyFuns, _PendingUpdateNotifFuns, Pid}} -> exit(Pid, freed); _Else -> ok end, MgrAcc#mgr{cached_groups=CachedGroups2} end, Mgr, GroupIds), {reply, ok, Mgr2}. handle_cast({get_group_async, GroupId, OrigFrom}, Mgr) -> #mgr{ db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(GroupId, CachedGroups) of {ok, CachedGroup} -> gen_server:reply(OrigFrom, {ok, CachedGroup}), {noreply, Mgr}; error -> {Mgr2, Reply} = case GetGroupInfoFun(GroupId) of {ok, {NamedQueries, GroupState}} -> {ok, Group} = open(Db, Fd, NamedQueries, GroupState), NewMgr = Mgr#mgr{cached_groups=dict:store(GroupId, Group, CachedGroups)}, {NewMgr, {ok, Group}}; Else -> {Mgr, Else} end, gen_server:reply(OrigFrom, Reply), {noreply, Mgr2} end; handle_cast({update_group, GroupId, UpdateNotifFun}, Mgr) -> #mgr{ update_state_by_id=ProcsDict, ids_by_pid=GroupIdDict, db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(GroupId, ProcsDict) of {ok, {processing_request, NotifyFuns, Pid}} -> ProcsDict2 = dict:store( GroupId, {processing_and_pending_request, NotifyFuns, [UpdateNotifFun], Pid}, ProcsDict ), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; {ok, {processing_and_pending_request, NotifyFuns, PendingUpdateNotifFuns, Pid}} -> ProcsDict2 = dict:store( GroupId, {processing_and_pending_request, NotifyFuns, [UpdateNotifFun | PendingUpdateNotifFuns], Pid}, ProcsDict ), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; error -> case dict:find(GroupId, CachedGroups) of {ok, Group} -> Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, [UpdateNotifFun]]); error -> Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, [UpdateNotifFun]]) end, ProcsDict2 = dict:store(GroupId, {processing_request, [UpdateNotifFun], Pid}, ProcsDict), GroupIdDict2 = dict:store(Pid, GroupId, GroupIdDict), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}} end; handle_cast(stop, Mgr) -> {stop, normal, Mgr}. % causes terminate to be called handle_info({'EXIT', FromPid, Reason}, Mgr) -> #mgr{ update_state_by_id=ProcsDict, ids_by_pid=GroupIdDict, db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(FromPid, GroupIdDict) of {ok, GroupId} -> case dict:find(GroupId, ProcsDict) of {ok, {processing_request, NotifyFuns, _Pid}} -> GroupIdDict2 = dict:erase(FromPid, GroupIdDict), ProcsDict2 = dict:erase(GroupId, ProcsDict); {ok, {processing_and_pending_request, NotifyFuns, NextNotifyFuns, _Pid}} -> case dict:find(GroupId, CachedGroups) of {ok, Group} -> Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, NextNotifyFuns]); error -> Pid = spawn_link(couch_stew_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, NextNotifyFuns]) end, GroupIdDict2 = dict:store(Pid, GroupId, dict:erase(FromPid, GroupIdDict)), ProcsDict2 = dict:store(GroupId, {processing_request, NextNotifyFuns, Pid}, ProcsDict) end, case Reason of normal -> ok; {{nocatch, Error}, _Trace} -> % process returned abnormally, notify any waiting listeners [catch NotifyFun(Error) || NotifyFun <- NotifyFuns]; _Else -> % process returned abnormally, notify any waiting listeners [catch NotifyFun(Reason) || NotifyFun <- NotifyFuns] end, Mgr2 = Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}, {noreply, Mgr2}; error -> % a linked process must have died, we propagate the error exit(Reason) end. code_change(_OldVsn, State, _Extra) -> {ok, State}. open(Db, Fd, NamedQueries, GroupState) -> open(Db, Fd, NamedQueries, GroupState, fun(_View) -> ok end). open(Db, Fd, {QueryLang, NamedQueries}, GroupState, UpdateNotifFun) -> {ViewBtreeStates, CurrentSeq, GroupIdBtreeState} = case GroupState of nil -> % new view group, init GroupState to nils {[ nil || _Query <- NamedQueries], 0, nil}; GroupState -> GroupState end, {Views, _N} = lists:mapfoldl(fun({{Name, QueryString}, BtreeState}, N) -> {ok, Btree} = couch_btree:open(BtreeState, Fd, fun less_json/2), {#view{name=Name, id_num=N, btree=Btree, query_string=QueryString}, N+1} end, 0, lists:zip(NamedQueries, ViewBtreeStates)), {ok, GroupIdBtree} = couch_btree:open(GroupIdBtreeState, Fd), ViewGroup = #view_group{db=Db, views=Views, current_seq=CurrentSeq, query_lang=QueryLang, id_btree=GroupIdBtree, update_notif_fun=UpdateNotifFun, named_queries=NamedQueries}, {ok, ViewGroup}. get_info(#view_group{query_lang=QueryLang, named_queries=NamedQueries, views=Views, current_seq=CurrentSeq, id_btree=GroupIdBtree} = _ViewGroup) -> ViewBtreeStates = [couch_btree:get_state(View#view.btree) || View <- Views], {{QueryLang, NamedQueries}, {ViewBtreeStates, CurrentSeq, couch_btree:get_state(GroupIdBtree)}}. fold(ViewGroup, ViewName, Dir, Fun, Acc) -> Result = fold_int(ViewGroup#view_group.views, ViewName, Dir, Fun, Acc), Result. fold_int([], _ViewName, _Dir, _Fun, _Acc) -> {not_found, missing_named_view}; fold_int([View | _RestViews], ViewName, Dir, Fun, Acc) when View#view.name == ViewName -> TotalRowCount = couch_btree:row_count(View#view.btree), WrapperFun = fun({Key, Value}, Offset, WrapperAcc) -> Fun(null, Key, Value, Offset, TotalRowCount, WrapperAcc) end, {ok, AccResult} = couch_btree:fold(View#view.btree, Dir, WrapperFun, Acc), {ok, TotalRowCount, AccResult}; fold_int([_View | RestViews], ViewName, Dir, Fun, Acc) -> fold_int(RestViews, ViewName, Dir, Fun, Acc). fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc) -> Result = fold_int(ViewGroup#view_group.views, ViewName, StartKey, Dir, Fun, Acc), Result. fold_int([], _ViewName, _StartKey, _Dir, _Fun, _Acc) -> {not_found, missing_named_view}; fold_int([View | _RestViews], ViewName, StartKey, Dir, Fun, Acc) when View#view.name == ViewName -> TotalRowCount = couch_btree:row_count(View#view.btree), WrapperFun = fun({Key, Value}, Offset, WrapperAcc) -> Fun(null, Key, Value, Offset, TotalRowCount, WrapperAcc) end, {ok, AccResult} = couch_btree:fold(View#view.btree, StartKey, Dir, WrapperFun, Acc), {ok, TotalRowCount, AccResult}; fold_int([_View | RestViews], ViewName, StartKey, Dir, Fun, Acc) -> fold_int(RestViews, ViewName, StartKey, Dir, Fun, Acc). less_json(A, B) -> TypeA = type_sort(A), TypeB = type_sort(B), if TypeA == TypeB -> less_same_type(A,B); true -> TypeA < TypeB end. type_sort(V) when is_atom(V) -> 0; type_sort(V) when is_integer(V) -> 1; type_sort(V) when is_float(V) -> 1; type_sort(V) when is_list(V) -> 2; type_sort({obj, _}) -> 4; % must come before tuple test below type_sort(V) when is_tuple(V) -> 3; type_sort(V) when is_binary(V) -> 5. atom_sort(nil) -> 0; atom_sort(null) -> 1; atom_sort(false) -> 2; atom_sort(true) -> 3. less_same_type(A,B) when is_atom(A) -> atom_sort(A) < atom_sort(B); less_same_type(A,B) when is_list(A) -> couch_util:collate(A, B) < 0; less_same_type({obj, AProps}, {obj, BProps}) -> less_props(AProps, BProps); less_same_type(A, B) when is_tuple(A) -> less_list(tuple_to_list(A),tuple_to_list(B)); less_same_type(A, B) -> A < B. ensure_list(V) when is_list(V) -> V; ensure_list(V) when is_atom(V) -> atom_to_list(V). less_props([], [_|_]) -> true; less_props(_, []) -> false; less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of -1 -> true; 1 -> false; 0 -> case less_json(AValue, BValue) of true -> true; false -> case less_json(BValue, AValue) of true -> false; false -> less_props(RestA, RestB) end end end. less_list([], [_|_]) -> true; less_list(_, []) -> false; less_list([A|RestA], [B|RestB]) -> case less_json(A,B) of true -> true; false -> case less_json(B,A) of true -> false; false -> less_list(RestA, RestB) end end. notify(MgrPid, UpdateStatus, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> GroupInfo = get_info(ViewGroup), ok = gen_server:call(MgrPid, {group_cache_update, GroupId, ViewGroup}), ok = UpdateGroupInfoFun(GroupId, UpdateStatus, GroupInfo), StatusNotifyFuns2 = lists:foldl(fun(NotifyFun, AccFuns) -> case (catch NotifyFun(UpdateStatus)) of NewNotifyFun when is_function(NewNotifyFun) -> [NewNotifyFun | AccFuns]; _Else -> AccFuns end end, [], StatusNotifyFuns), fun(UpdateStatus2, ViewGroup2) -> notify(MgrPid, UpdateStatus2, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns2) end. update_view_proc(MgrPid, #view_group{} = ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> NotifyFun = fun(UpdateStatus, ViewGroup2) -> notify(MgrPid, UpdateStatus, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) end, update_int(ViewGroup, NotifyFun). update_view_proc(MgrPid, Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, StatusNotifyFuns) -> case GetGroupInfoFun(GroupId) of {ok, {Queries, GroupState}} -> {ok, ViewGroup} = open(Db, Fd, Queries, GroupState), update_view_proc(MgrPid, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns); Error -> exit(Error) end. update_int(ViewGroup, NotifyFun) -> #view_group{ db=Db, views=Views, current_seq=CurrentSeq } = ViewGroup, EmptyStewDicts = [dict:new() || _ <- Views], % compute on all docs modified since we last computed. {ok, {UncomputedDocs, ViewGroup2, StewDicts, NewSeq, NotifyFun2}} = couch_db:enum_docs_since( Db, CurrentSeq, fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, {[], ViewGroup, EmptyStewDicts, CurrentSeq, NotifyFun} ), {ViewGroup3, Results} = view_compute(ViewGroup2, UncomputedDocs), StewDicts2 = stew_unify_query_results(Results, StewDicts), couch_query_servers:stop_doc_map(ViewGroup3#view_group.compiled_doc_map), {ok, ViewGroup4} = write_group_changes(ViewGroup3, StewDicts2, NewSeq), NotifyFun2(complete, ViewGroup4), {ok, ViewGroup4}. %% Per-document consumption support for update_int with additional logic to %% flush to disk when our memory usage exceeds FLUSH_MAX_MEM. process_doc(Db, DocInfo, {Docs, ViewGroup, StewDicts, _LastSeq, NotifyFun}) -> % This fun computes once for each document #doc_info{id=DocId, update_seq=Seq} = DocInfo, case couch_doc:is_special_doc(DocId) of true -> % skip this doc {ok, {Docs, ViewGroup, StewDicts, _LastSeq, NotifyFun}}; false -> Docs2 = case couch_db:open_doc(Db, DocInfo, [conflicts,deleted_conflicts]) of {ok, Doc} -> [Doc | Docs]; {not_found, deleted} -> Docs end, case process_info(self(), memory) of {memory, Mem} when Mem > ?FLUSH_MAX_MEM -> {ViewGroup1, Results} = view_compute(ViewGroup, Docs2), StewDicts2 = stew_unify_query_results(Results, StewDicts), {ok, ViewGroup2} = write_group_changes(ViewGroup1, StewDicts2, Seq), garbage_collect(), NotifyFun2 = NotifyFun(partial, ViewGroup2), EmptyStewDicts = [dict:new() || _ <- ViewGroup2#view_group.views], {ok, {[], ViewGroup2, EmptyStewDicts, Seq, NotifyFun2}}; _Else -> {ok, {Docs2, ViewGroup, StewDicts, Seq, NotifyFun}} end end. %% Process a (coordinated) list of documents and mapped query results, returning a %% list of tuples mapping Stews(/Views) to dicts of keys/summed values (of the %% query results). stew_unify_query_results([], StewDicts) -> StewDicts; stew_unify_query_results([QueryResults|RestResults], StewDicts) -> StewDicts2 = stew_dictify_doc_query_results(QueryResults, StewDicts, []), stew_unify_query_results(RestResults, StewDicts2). %% Processes a single document's mapped Results stew-by-stew, summing them with the current %% set of StewDicts. %% StewDicts is a list of tuples mapping Stews(/Views) to dictionaries of key/values where %% the value is a Stew-sum. (Just a number, for now.) stew_dictify_doc_query_results([], [], StewDictsAcc) -> lists:reverse(StewDictsAcc); stew_dictify_doc_query_results([ResultKVs|RestResultKVs], [StewDict|RestStewDicts], StewDictsAcc) -> StewDict2 = dict:merge( fun(_Key, Value1, Value2) -> Value1 + Value2 end, dict:from_list(ResultKVs), StewDict), stew_dictify_doc_query_results(RestResultKVs, RestStewDicts, [StewDict2|StewDictsAcc]). %% Pass the given documents through to the mapping interpreter. For each %% document, each view mapping function is applied. The result of each mapping %% function is a list of {key, value} tuples. %% %% return: {updated ViewGroup, [Doc-Results+]} %% where Doc-Results: [View-Results+] %% where View-Results: [{key1, value1}, {key1, value2}, ...] aka the result of %% applying the view's mapping function to the Doc in question. view_compute(ViewGroup, []) -> {ViewGroup, []}; view_compute(#view_group{query_lang=QueryLang, compiled_doc_map=DocMap}=ViewGroup, Docs) -> DocMap1 = case DocMap of nil -> % doc map not started {ok, DocMap0} = couch_query_servers:start_doc_map(QueryLang, queries(ViewGroup)), DocMap0; _ -> DocMap end, {ok, Results} = couch_query_servers:map_docs(DocMap1, Docs), {ViewGroup#view_group{compiled_doc_map=DocMap1}, Results}. queries(ViewGroup) -> [View#view.query_string || View <- ViewGroup#view_group.views]. %% Unify the StewDicts with those in the Btree by looking up all of the current keys from %% each StewDict in the Btree, doing our sum/unification, and then putting all of the %% (potentially) revised key/values back in. %% Because we do not need to touch the Group btree, we are able to sub out the actual %% per-view logic. write_group_changes(ViewGroup, StewDicts, NewSeq) -> % For each Stew(/View)... Views2 = [write_view_changes(View, StewDict) || {View, StewDict} <- lists:zip(ViewGroup#view_group.views, StewDicts)], ViewGroup2 = ViewGroup#view_group{views=Views2, current_seq=NewSeq}, {ok, ViewGroup2}. write_view_changes(View, StewDict) -> LookupKeys = dict:fetch_keys(StewDict), % Perform the look-up {ok, LookupResults, ViewBtree2} = couch_btree:query_modify(View#view.btree, LookupKeys, [], []), % Merge the look-up results with StewDict, % also, keep track of found keys so that we can remove them {StewDict2, RemoveKeys} = lists:foldl( fun(LookupResult, {StewDictAcc, RemoveKeysAcc}) -> case LookupResult of {ok, {Key, Value}} -> {dict:update(Key, fun(OValue) -> Value + OValue end, StewDictAcc), [Key | RemoveKeysAcc]}; {not_found, _} -> {StewDictAcc, RemoveKeysAcc} end end, {StewDict, []}, LookupResults), % {ok, ViewBtree3} = couch_btree:add_remove(ViewBtree2, dict:to_list(StewDict2), RemoveKeys), View#view{btree = ViewBtree3}.