Skip to content

Commit

Permalink
add single key call
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshil goel committed Jul 15, 2024
1 parent c22acb6 commit af560bd
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 35 deletions.
48 changes: 48 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
// and GetSingeValueForKey works without an issue.

key := x.DataKey(x.GalaxyAttr("value"), 1240)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
N := int(10000)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
Value: []byte("ho hey there" + strconv.Itoa(i)),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
kData := ol.getMutation(uint64(i))
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
require.NoError(t, err)
}
writer.Flush()

if i%10 == 0 {
// Do frequent rollups, and store data in old timestamp
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := 2
if j < int(ol.minTs) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}
}
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
Expand Down
64 changes: 64 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,70 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
getDeltas := func() *pb.PostingList {
lc.RLock()
defer lc.RUnlock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err != nil {
return pl
}
}

return nil
}

getPostings := func() (*pb.PostingList, error) {
pl := getDeltas()
if pl != nil {
return pl, nil
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

return pl, err
}

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
return nil, nil
}
if err != nil {
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
104 changes: 69 additions & 35 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
out := &pb.Result{}
Expand All @@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
key := x.DataKey(q.Attr, q.UidList.Uids[i])

// Get or create the posting list for an entity, attribute combination.
pl, err := qs.cache.Get(key)
if err != nil {
return err
}

// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
var vals []types.Val
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
continue
}
vals = make([]types.Val, len(pl.Postings))
for i, p := range pl.Postings {
vals[i] = types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
}
} else {
pl, err := qs.cache.Get(key)
if err != nil {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}

vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}
continue
case err != nil:
return err
}

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)

switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
}
continue
case err != nil:
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
}
}

uidList := new(pb.List)
Expand Down

0 comments on commit af560bd

Please sign in to comment.