%% CouchDB %% Copyright (C) 2006 Damien Katz %% %% This program is free software; you can redistribute it and/or %% modify it under the terms of the GNU General Public License %% as published by the Free Software Foundation; either version 2 %% of the License, or (at your option) any later version. %% %% This program is distributed in the hope that it will be useful, %% but WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the %% GNU General Public License for more details. %% %% You should have received a copy of the GNU General Public License %% along with this program; if not, write to the Free Software %% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -module(couch_view_group). -behaviour(gen_server). -export([start_manager/4,stop/1, open/4,open/5,fold/5,fold/6, update_group/3,update_view_proc/5,free_groups/2, update_view_proc/7,get_group_async/3, less_json/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). -include("couch_db.hrl"). % arbitrarily chosen amount of memory to use before flushing to disk -define(FLUSH_MAX_MEM, 4000000). -record(view_group, {db, query_lang, named_queries, views, current_seq, id_btree, update_notif_fun, compiled_doc_map = nil }). -record(view, {id_num, name, btree, columns, query_string }). -record(mgr, {db, fd, update_state_by_id = dict:new(), ids_by_pid = dict:new(), get_group_info_fun, update_group_info_fun, cached_groups=dict:new() }). start_manager(Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun) -> gen_server:start_link(couch_view_group, {Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}, []). stop(MgrPid) -> gen_server:cast(MgrPid, stop). init({Db, Fd, GetGroupInfoFun, UpdateGroupInfoFun}) -> process_flag(trap_exit, true), {ok, #mgr{db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun}}. terminate(_Reason, _Mgr) -> ok. get_group_async(MgrPid, GroupId, OrigFrom) -> gen_server:cast(MgrPid, {get_group_async, GroupId, OrigFrom}). update_group(MgrPid, GroupId, UpdateNotifFun) -> gen_server:cast(MgrPid, {update_group, GroupId, UpdateNotifFun}). % stops any processing of the view and frees and cached values free_groups(MgrPid, GroupIds) -> gen_server:call(MgrPid, {free_groups, GroupIds}). % called from the update process handle_call({group_cache_update, GroupId, Group}, {FromPid, _FromRef}, Mgr) -> Group2 = Group#view_group{compiled_doc_map=nil}, % the process may have been killed by the free groups call % so check to make sure its alive. case is_process_alive(FromPid) of true -> #mgr{cached_groups=CachedGroups} = Mgr, CachedGroups2 = dict:store(GroupId, Group2, CachedGroups), {reply, ok, Mgr#mgr{cached_groups=CachedGroups2}}; false -> {reply, ok, Mgr} end; handle_call({free_groups, GroupIds}, _From, Mgr) -> Mgr2 = lists:foldl(fun(GroupId, MgrAcc) -> #mgr{cached_groups=CachedGroups, update_state_by_id=ProcsDict} = MgrAcc, CachedGroups2 = dict:erase(GroupId, CachedGroups), case dict:find(GroupId, ProcsDict) of {ok, {processing_request, _NotifyFuns, Pid}} -> exit(Pid, freed); {ok, {processing_and_pending_request, _NotifyFuns, _PendingUpdateNotifFuns, Pid}} -> exit(Pid, freed); _Else -> ok end, MgrAcc#mgr{cached_groups=CachedGroups2} end, Mgr, GroupIds), {reply, ok, Mgr2}. handle_cast({get_group_async, GroupId, OrigFrom}, Mgr) -> #mgr{ db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(GroupId, CachedGroups) of {ok, CachedGroup} -> gen_server:reply(OrigFrom, {ok, CachedGroup}), {noreply, Mgr}; error -> {Mgr2, Reply} = case GetGroupInfoFun(GroupId) of {ok, {NamedQueries, GroupState}} -> {ok, Group} = open(Db, Fd, NamedQueries, GroupState), NewMgr = Mgr#mgr{cached_groups=dict:store(GroupId, Group, CachedGroups)}, {NewMgr, {ok, Group}}; Else -> {Mgr, Else} end, gen_server:reply(OrigFrom, Reply), {noreply, Mgr2} end; handle_cast({update_group, GroupId, UpdateNotifFun}, Mgr) -> #mgr{ update_state_by_id=ProcsDict, ids_by_pid=GroupIdDict, db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(GroupId, ProcsDict) of {ok, {processing_request, NotifyFuns, Pid}} -> ProcsDict2 = dict:store( GroupId, {processing_and_pending_request, NotifyFuns, [UpdateNotifFun], Pid}, ProcsDict ), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; {ok, {processing_and_pending_request, NotifyFuns, PendingUpdateNotifFuns, Pid}} -> ProcsDict2 = dict:store( GroupId, {processing_and_pending_request, NotifyFuns, [UpdateNotifFun | PendingUpdateNotifFuns], Pid}, ProcsDict ), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2}}; error -> case dict:find(GroupId, CachedGroups) of {ok, Group} -> Pid = spawn_link(couch_view_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, [UpdateNotifFun]]); error -> Pid = spawn_link(couch_view_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, [UpdateNotifFun]]) end, ProcsDict2 = dict:store(GroupId, {processing_request, [UpdateNotifFun], Pid}, ProcsDict), GroupIdDict2 = dict:store(Pid, GroupId, GroupIdDict), {noreply, Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}} end; handle_cast(stop, Mgr) -> {stop, normal, Mgr}. % causes terminate to be called handle_info({'EXIT', FromPid, Reason}, Mgr) -> #mgr{ update_state_by_id=ProcsDict, ids_by_pid=GroupIdDict, db=Db, fd=Fd, get_group_info_fun=GetGroupInfoFun, update_group_info_fun=UpdateGroupInfoFun, cached_groups=CachedGroups } = Mgr, case dict:find(FromPid, GroupIdDict) of {ok, GroupId} -> case dict:find(GroupId, ProcsDict) of {ok, {processing_request, NotifyFuns, _Pid}} -> GroupIdDict2 = dict:erase(FromPid, GroupIdDict), ProcsDict2 = dict:erase(GroupId, ProcsDict); {ok, {processing_and_pending_request, NotifyFuns, NextNotifyFuns, _Pid}} -> case dict:find(GroupId, CachedGroups) of {ok, Group} -> Pid = spawn_link(couch_view_group, update_view_proc, [self(), Group, GroupId, UpdateGroupInfoFun, NextNotifyFuns]); error -> Pid = spawn_link(couch_view_group, update_view_proc, [self(), Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, NextNotifyFuns]) end, GroupIdDict2 = dict:store(Pid, GroupId, dict:erase(FromPid, GroupIdDict)), ProcsDict2 = dict:store(GroupId, {processing_request, NextNotifyFuns, Pid}, ProcsDict) end, case Reason of normal -> ok; {{nocatch, Error}, _Trace} -> % process returned abnormally, notify any waiting listeners [catch NotifyFun(Error) || NotifyFun <- NotifyFuns]; _Else -> % process returned abnormally, notify any waiting listeners [catch NotifyFun(Reason) || NotifyFun <- NotifyFuns] end, Mgr2 = Mgr#mgr{update_state_by_id=ProcsDict2, ids_by_pid=GroupIdDict2}, {noreply, Mgr2}; error -> % a linked process must have died, we propagate the error exit(Reason) end. code_change(_OldVsn, State, _Extra) -> {ok, State}. open(Db, Fd, NamedQueries, GroupState) -> open(Db, Fd, NamedQueries, GroupState, fun(_View) -> ok end). open(Db, Fd, {QueryLang, NamedQueries}, GroupState, UpdateNotifFun) -> {ViewBtreeStates, CurrentSeq, GroupIdBtreeState} = case GroupState of nil -> % new view group, init GroupState to nils {[ nil || _Query <- NamedQueries], 0, nil}; GroupState -> GroupState end, {Views, _N} = lists:mapfoldl(fun({{Name, QueryString}, BtreeState}, N) -> {ok, Btree} = couch_btree:open(BtreeState, Fd, fun less_json/2), {#view{name=Name, id_num=N, btree=Btree, query_string=QueryString}, N+1} end, 0, lists:zip(NamedQueries, ViewBtreeStates)), {ok, GroupIdBtree} = couch_btree:open(GroupIdBtreeState, Fd), ViewGroup = #view_group{db=Db, views=Views, current_seq=CurrentSeq, query_lang=QueryLang, id_btree=GroupIdBtree, update_notif_fun=UpdateNotifFun, named_queries=NamedQueries}, {ok, ViewGroup}. get_info(#view_group{query_lang=QueryLang, named_queries=NamedQueries, views=Views, current_seq=CurrentSeq, id_btree=GroupIdBtree} = _ViewGroup) -> ViewBtreeStates = [couch_btree:get_state(View#view.btree) || View <- Views], {{QueryLang, NamedQueries}, {ViewBtreeStates, CurrentSeq, couch_btree:get_state(GroupIdBtree)}}. fold(ViewGroup, ViewName, Dir, Fun, Acc) -> Result = fold_int(ViewGroup#view_group.views, ViewName, Dir, Fun, Acc), Result. fold_int([], _ViewName, _Dir, _Fun, _Acc) -> {not_found, missing_named_view}; fold_int([View | _RestViews], ViewName, Dir, Fun, Acc) when View#view.name == ViewName -> TotalRowCount = couch_btree:row_count(View#view.btree), WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) -> Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc) end, {ok, AccResult} = couch_btree:fold(View#view.btree, Dir, WrapperFun, Acc), {ok, TotalRowCount, AccResult}; fold_int([_View | RestViews], ViewName, Dir, Fun, Acc) -> fold_int(RestViews, ViewName, Dir, Fun, Acc). fold(ViewGroup, ViewName, StartKey, Dir, Fun, Acc) -> Result = fold_int(ViewGroup#view_group.views, ViewName, StartKey, Dir, Fun, Acc), Result. fold_int([], _ViewName, _StartKey, _Dir, _Fun, _Acc) -> {not_found, missing_named_view}; fold_int([View | _RestViews], ViewName, StartKey, Dir, Fun, Acc) when View#view.name == ViewName -> TotalRowCount = couch_btree:row_count(View#view.btree), WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) -> Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc) end, {ok, AccResult} = couch_btree:fold(View#view.btree, StartKey, Dir, WrapperFun, Acc), {ok, TotalRowCount, AccResult}; fold_int([_View | RestViews], ViewName, StartKey, Dir, Fun, Acc) -> fold_int(RestViews, ViewName, StartKey, Dir, Fun, Acc). less_json(A, B) -> TypeA = type_sort(A), TypeB = type_sort(B), if TypeA == TypeB -> less_same_type(A,B); true -> TypeA < TypeB end. type_sort(V) when is_atom(V) -> 0; type_sort(V) when is_integer(V) -> 1; type_sort(V) when is_float(V) -> 1; type_sort(V) when is_list(V) -> 2; type_sort({obj, _}) -> 4; % must come before tuple test below type_sort(V) when is_tuple(V) -> 3; type_sort(V) when is_binary(V) -> 5. atom_sort(nil) -> 0; atom_sort(null) -> 1; atom_sort(false) -> 2; atom_sort(true) -> 3. less_same_type(A,B) when is_atom(A) -> atom_sort(A) < atom_sort(B); less_same_type(A,B) when is_list(A) -> couch_util:collate(A, B) < 0; less_same_type({obj, AProps}, {obj, BProps}) -> less_props(AProps, BProps); less_same_type(A, B) when is_tuple(A) -> less_list(tuple_to_list(A),tuple_to_list(B)); less_same_type(A, B) -> A < B. ensure_list(V) when is_list(V) -> V; ensure_list(V) when is_atom(V) -> atom_to_list(V). less_props([], [_|_]) -> true; less_props(_, []) -> false; less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) -> case couch_util:collate(ensure_list(AKey), ensure_list(BKey)) of -1 -> true; 1 -> false; 0 -> case less_json(AValue, BValue) of true -> true; false -> case less_json(BValue, AValue) of true -> false; false -> less_props(RestA, RestB) end end end. less_list([], [_|_]) -> true; less_list(_, []) -> false; less_list([A|RestA], [B|RestB]) -> case less_json(A,B) of true -> true; false -> case less_json(B,A) of true -> false; false -> less_list(RestA, RestB) end end. notify(MgrPid, UpdateStatus, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> GroupInfo = get_info(ViewGroup), ok = gen_server:call(MgrPid, {group_cache_update, GroupId, ViewGroup}), ok = UpdateGroupInfoFun(GroupId, UpdateStatus, GroupInfo), StatusNotifyFuns2 = lists:foldl(fun(NotifyFun, AccFuns) -> case (catch NotifyFun(UpdateStatus)) of NewNotifyFun when is_function(NewNotifyFun) -> [NewNotifyFun | AccFuns]; _Else -> AccFuns end end, [], StatusNotifyFuns), fun(UpdateStatus2, ViewGroup2) -> notify(MgrPid, UpdateStatus2, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns2) end. update_view_proc(MgrPid, #view_group{} = ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) -> NotifyFun = fun(UpdateStatus, ViewGroup2) -> notify(MgrPid, UpdateStatus, ViewGroup2, GroupId, UpdateGroupInfoFun, StatusNotifyFuns) end, update_int(ViewGroup, NotifyFun). update_view_proc(MgrPid, Db, Fd, GroupId, GetGroupInfoFun, UpdateGroupInfoFun, StatusNotifyFuns) -> case GetGroupInfoFun(GroupId) of {ok, {Queries, GroupState}} -> {ok, ViewGroup} = open(Db, Fd, Queries, GroupState), update_view_proc(MgrPid, ViewGroup, GroupId, UpdateGroupInfoFun, StatusNotifyFuns); Error -> exit(Error) end. update_int(ViewGroup, NotifyFun) -> #view_group{ db=Db, views=Views, current_seq=CurrentSeq } = ViewGroup, ViewEmptyKVs = [{View, []} || View <- Views], % compute on all docs modified since we last computed. {ok, {UncomputedDocs, ViewGroup2, ViewKVsToAdd, DocIdViewIdKeys, NewSeq, NotifyFun2}} = couch_db:enum_docs_since( Db, CurrentSeq, fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, {[], ViewGroup, ViewEmptyKVs, [], CurrentSeq, NotifyFun} ), {ViewGroup3, Results} = view_compute(ViewGroup2, UncomputedDocs), {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), couch_query_servers:stop_doc_map(ViewGroup3#view_group.compiled_doc_map), {ok, ViewGroup4} = write_changes(ViewGroup3, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq), NotifyFun2(complete, ViewGroup4), {ok, ViewGroup4}. %% Given a document id (via DocInfo) in a given Db, lookup and store the given %% document in the accumulator structure, accounting for deleted documents. %% Takes and returns an accumulator structure which will be processed and %% flushed when the process memory utilization exceeds FLUSH_MAX_MEM. %% Processing entails mapping the documents via view_compute, >>> <<< %% %% Accumulator structure: %% Docs: A list of (added/updated) documents to be processed. %% ViewGroup: The ViewGroup being processed, current state. %% ViewKVs: A list of tuples of {View, [{{Key, DocId}, Value}+]}. In other %% words, it maps Views to their key-value pairs, where the key also has %% the document that produced it bound to it. %% DocIdViewIdKeys: A list of tuples mapping document id's to their associated %% keys (of the form {view id, key}). Deleted documents have their document id's %% mapped to an empty list of keys. %% LastSeq: %% NotifyFun: A callback function to be invoked with (partial, ViewGroup) each %% time we process and flush a block. process_doc(Db, DocInfo, {Docs, ViewGroup, ViewKVs, DocIdViewIdKeys, _LastSeq, NotifyFun}) -> % This fun computes once for each document #doc_info{id=DocId, update_seq=Seq} = DocInfo, case couch_doc:is_special_doc(DocId) of true -> % skip this doc {ok, {Docs, ViewGroup, ViewKVs, DocIdViewIdKeys, _LastSeq, NotifyFun}}; false -> {Docs2, DocIdViewIdKeys2} = case couch_db:open_doc(Db, DocInfo, [conflicts,deleted_conflicts]) of {ok, Doc} -> {[Doc | Docs], DocIdViewIdKeys}; {not_found, deleted} -> {Docs, [{DocId, []} | DocIdViewIdKeys]} end, case process_info(self(), memory) of {memory, Mem} when Mem > ?FLUSH_MAX_MEM -> {ViewGroup1, Results} = view_compute(ViewGroup, Docs2), {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2, Results, ViewKVs, DocIdViewIdKeys2), {ok, ViewGroup2} = write_changes(ViewGroup1, ViewKVs3, DocIdViewIdKeys3, Seq), garbage_collect(), NotifyFun2 = NotifyFun(partial, ViewGroup2), ViewEmptyKeyValues = [{View, []} || View <- ViewGroup2#view_group.views], {ok, {[], ViewGroup2, ViewEmptyKeyValues, [], Seq, NotifyFun2}}; _Else -> {ok, {Docs2, ViewGroup, ViewKVs, DocIdViewIdKeys2, Seq, NotifyFun}} end end. %% Processes a list of documents, document-by-document, updating the ViewKVs mapping and %% DocIdViewIdKeys mappings for each document. %% %% Docs: Documents to process. %% Results: Results of view_compute; key/value tuples mapped by user view mapping functions. %% We expect an exact correspondence between Docs[n] and Results[n]. %% ViewKVs: A list of tuples of {View, [{{Key, DocId}, Value}+]}. Maps Views to the %% keys/values they have produced and the document that generated the given key/value. %% DocIdViewIdKeysAcc: List of tuples of {document id, ViewIdKeys}, where ViewIdKeys is %% a list of {view id, key} tuples. Maps documents to all the keys they have produced %% and the view under which they were produced. view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> {ViewKVs, DocIdViewIdKeysAcc}; view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []), NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). %% Processes a single document's mapped Results view-by-view, returning {ViewKVs, ViewIdKeys} where: %% ViewKVs is an updated version of the passed-in ViewKVsAcc, mapping Views to their %% list of associated key-doc-value nested tuples of the form {{Key, DocId}, Value}. %% ViewIdKeys is a per-document list of {view id, key} tuples. It lists all the keys %% produced from mapping the given document and the views that produced them. %% (Because the list of views is traversed left-to-right and builds up the result lists %% as it goes, the result lists need to be reversed when the function returns, hence %% the specialized final case.) %% %% Doc: The document, beloved for its id. %% Results: The list of not yet processed mapping results for this document. %% ViewKVs: The list of not yet processed views and the key-doc-value nested tuples currently %% associated with each view. %% ViewKVsAcc: ViewKVs containing the views already processed and updated for the current %% document (accumulated). %% ViewIdKeysAcc: List of one-per-document lists of tuples of {view id, key} (accumulated). view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs], NewViewIdKeys = [{View#view.id_num, Key} || {Key, _Value} <- ResultKVs], NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc], NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). %% Pass the given documents through to the mapping interpreter. For each %% document, each view mapping function is applied. The result of each mapping %% function is a list of {key, value} tuples. %% %% return: {updated ViewGroup, [Doc-Results+]} %% where Doc-Results: [View-Results+] %% where View-Results: [{key1, value1}, {key1, value2}, ...] aka the result of %% applying the view's mapping function to the Doc in question. view_compute(ViewGroup, []) -> {ViewGroup, []}; view_compute(#view_group{query_lang=QueryLang, compiled_doc_map=DocMap}=ViewGroup, Docs) -> DocMap1 = case DocMap of nil -> % doc map not started {ok, DocMap0} = couch_query_servers:start_doc_map(QueryLang, queries(ViewGroup)), DocMap0; _ -> DocMap end, {ok, Results} = couch_query_servers:map_docs(DocMap1, Docs), {ViewGroup#view_group{compiled_doc_map=DocMap1}, Results}. queries(ViewGroup) -> [View#view.query_string || View <- ViewGroup#view_group.views]. dict_find(Key, DefaultValue, Dict) -> case dict:find(Key, Dict) of {ok, Value} -> Value; error -> DefaultValue end. %% Write changes to a ViewGroup by effectively removing all affected documents (either modified or deleted) %% from the Group Btrees and view Btrees and then adding all the 'current'/new key/value states (for added %% or modified documents). %% %% ViewGroup: The ViewGroup being updated. %% ViewKeyValuesToAdd: A list of tuples of {View, [{{Key, DocId}, Value}+]}. In other %% words, it maps Views to their key-value pairs, where the key also has %% the document that produced it bound to it. %% DocIdViewIdKeys: A list of tuples mapping document id's to their associated %% keys (of the form {view id, key}). Deleted documents have their document id's %% mapped to an empty list of keys. %% NewSeq: The new sequence number for this ViewGroup state. write_changes(ViewGroup, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) -> #view_group{id_btree=GroupIdBtree} = ViewGroup, AddDocIdViewIdKeys = [{DocId, ViewIdKeys} || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys /= []], RemoveDocIds = [DocId || {DocId, ViewIdKeys} <- DocIdViewIdKeys, ViewIdKeys == []], LookupDocIds = [DocId || {DocId, _ViewIdKeys} <- DocIdViewIdKeys], % Modify the Group Btree, whose keys are document id's and values are lists of {ViewId, Key} tuples. % Our query removes documents which no longer have any keys (RemoveDocIds), adds/replaces documents % for which we do have keys (AddDocIdViewIdKeys), and looks-up the existing values for all of the % documents we are modifying so that we can remove those values from the view maps below. (Rather % than attempt to figure out the net changes for a modified document, we instead just remove all % of its previously associated keys/values from the views, then add its new keys/values to the % views.) {ok, LookupResults, GroupIdBtree2} = couch_btree:query_modify(GroupIdBtree, LookupDocIds, AddDocIdViewIdKeys, RemoveDocIds), % Traverse LookupResults, accumulating a dictionary that maps Views % to a list of btree keys ({Key, DocId} tuples). KeysToRemoveByView = lists:foldl( fun(LookupResult, KeysToRemoveByViewAcc) -> case LookupResult of {ok, {DocId, ViewIdKeys}} -> lists:foldl( fun({ViewId, Key}, KeysToRemoveByViewAcc2) -> dict:append(ViewId, {Key, DocId}, KeysToRemoveByViewAcc2) end, KeysToRemoveByViewAcc, ViewIdKeys); {not_found, _} -> KeysToRemoveByViewAcc end end, dict:new(), LookupResults), % For each view, add the key values from ViewKeyValuesToAdd (a ViewKVs map, so all new % observed {{Key, DocId}, Value} tuples), and remove the keys just accumulated above. % (This is all done to the View's Btree, which is different from the Group Btree manipulation % performed above. Views2 = [ begin KeysToRemove = dict_find(View#view.id_num, [], KeysToRemoveByView), {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove), View#view{btree = ViewBtree2} end || {View, AddKeyValues} <- ViewKeyValuesToAdd ], ViewGroup2 = ViewGroup#view_group{views=Views2, current_seq=NewSeq, id_btree=GroupIdBtree2}, {ok, ViewGroup2}.