Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore(embedded): add embedded dgraph updates for namespacing, expose DoQuery #9216

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ func authorizeUser(ctx context.Context, userid string, password string) (
"$password": password,
}
req := &Request{
req: &api.Request{
Req: &api.Request{
Query: queryUser,
Vars: queryVars,
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}
queryResp, err := (&Server{}).doQuery(ctx, req)
queryResp, err := (&Server{}).DoQuery(ctx, req)
if err != nil {
glog.Errorf("Error while query user with id %s: %v", userid, err)
return nil, err
Expand All @@ -328,16 +328,16 @@ func authorizeUser(ctx context.Context, userid string, password string) (

func refreshAclCache(ctx context.Context, ns, refreshTs uint64) error {
req := &Request{
req: &api.Request{
Req: &api.Request{
Query: queryAcls,
ReadOnly: true,
StartTs: refreshTs,
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}

ctx = x.AttachNamespace(ctx, ns)
queryResp, err := (&Server{}).doQuery(ctx, req)
queryResp, err := (&Server{}).DoQuery(ctx, req)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func upsertGuardian(ctx context.Context) error {
`, x.GuardiansId)
groupNQuads := acl.CreateGroupNQuads(x.GuardiansId)
req := &Request{
req: &api.Request{
Req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -492,10 +492,10 @@ func upsertGuardian(ctx context.Context) error {
},
},
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}

resp, err := (&Server{}).doQuery(ctx, req)
resp, err := (&Server{}).DoQuery(ctx, req)

// Structs to parse guardians group uid from query response
type groupNode struct {
Expand Down Expand Up @@ -558,7 +558,7 @@ func upsertGroot(ctx context.Context, passwd string) error {
ObjectId: "uid(guid)",
})
req := &Request{
req: &api.Request{
Req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -569,10 +569,10 @@ func upsertGroot(ctx context.Context, passwd string) error {
},
},
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}

resp, err := (&Server{}).doQuery(ctx, req)
resp, err := (&Server{}).DoQuery(ctx, req)
if err != nil {
return errors.Wrapf(err, "while upserting user with id %s", x.GrootId)
}
Expand Down
12 changes: 6 additions & 6 deletions edgraph/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
"$join": join,
}
req := &Request{
req: &api.Request{
Req: &api.Request{
Query: queryForSHA,
Vars: variables,
ReadOnly: true,
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}
storedQuery, err := (&Server{}).doQuery(ctx, req)
storedQuery, err := (&Server{}).DoQuery(ctx, req)

if err != nil {
glog.Errorf("Error while querying sha %s", sha256Hash)
Expand Down Expand Up @@ -109,7 +109,7 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
}

req = &Request{
req: &api.Request{
Req: &api.Request{
Mutations: []*api.Mutation{
{
Set: []*api.NQuad{
Expand All @@ -129,11 +129,11 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
},
CommitNow: true,
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}

ctx := context.WithValue(ctx, IsGraphql, true)
_, err := (&Server{}).doQuery(ctx, req)
_, err := (&Server{}).DoQuery(ctx, req)
return err

}
Expand Down
6 changes: 3 additions & 3 deletions edgraph/multi_tenancy_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err
},
}
req := &Request{
req: &api.Request{
Req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -61,10 +61,10 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err
},
},
},
doAuth: NoAuthorize,
DoAuth: NoAuthorize,
}
ctx = x.AttachNamespace(ctx, inp.Namespace)
resp, err := (&Server{}).doQuery(ctx, req)
resp, err := (&Server{}).DoQuery(ctx, req)
if err != nil {
return errors.Wrapf(err, "Reset password for user %s in namespace %d, got error:",
inp.UserID, inp.Namespace)
Expand Down
46 changes: 23 additions & 23 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ func parseSchemaFromAlterOperation(ctx context.Context, op *api.Operation) (
// then restoring from the incremental backup of such a DB would restore even the dropped
// data back. This is also used to capture the delete namespace operation during backup.
func InsertDropRecord(ctx context.Context, dropOp string) error {
_, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true), &Request{
req: &api.Request{
_, err := (&Server{}).DoQuery(context.WithValue(ctx, IsGraphql, true), &Request{
Req: &api.Request{
Mutations: []*api.Mutation{{
Set: []*api.NQuad{{
Subject: "_:r",
Expand All @@ -368,7 +368,7 @@ func InsertDropRecord(ctx context.Context, dropOp string) error {
}},
}},
CommitNow: true,
}, doAuth: NoAuthorize})
}, DoAuth: NoAuthorize})
return err
}

Expand Down Expand Up @@ -1083,12 +1083,12 @@ type queryContext struct {
// Request represents a query request sent to the doQuery() method on the Server.
// It contains all the metadata required to execute a query.
type Request struct {
// req is the incoming gRPC request
req *api.Request
// gqlField is the GraphQL field for which the request is being sent
gqlField gqlSchema.Field
// doAuth tells whether this request needs ACL authorization or not
doAuth AuthMode
// Req is the incoming gRPC request
Req *api.Request
// GqlField is the GraphQL field for which the request is being sent
GqlField gqlSchema.Field
// DoAuth tells whether this request needs ACL authorization or not
DoAuth AuthMode
}

// Health handles /health and /health?all requests.
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func (s *Server) QueryGraphQL(ctx context.Context, req *api.Request,
}
}
// no need to attach namespace here, it is already done by GraphQL layer
return s.doQuery(ctx, &Request{req: req, gqlField: field, doAuth: getAuthMode(ctx)})
return s.DoQuery(ctx, &Request{Req: req, GqlField: field, DoAuth: getAuthMode(ctx)})
}

func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func (s *Server) QueryNoGrpc(ctx context.Context, req *api.Request) (*api.Respon
defer cancel()
}
}
return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)})
return s.DoQuery(ctx, &Request{Req: req, DoAuth: getAuthMode(ctx)})
}

var pendingQueries int64
Expand All @@ -1258,7 +1258,7 @@ func Init() {
maxPendingQueries = x.Config.Limit.GetInt64("max-pending-queries")
}

func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) {
func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
Expand All @@ -1282,7 +1282,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
// glog.Infof("Got a query, DQL form: %+v at %+v", req.req, l.Start.Format(time.RFC3339))
// }

isMutation := len(req.req.Mutations) > 0
isMutation := len(req.Req.Mutations) > 0
methodRequest := methodQuery
if isMutation {
methodRequest = methodMutate
Expand Down Expand Up @@ -1311,15 +1311,15 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
return
}

req.req.Query = strings.TrimSpace(req.req.Query)
isQuery := len(req.req.Query) != 0
req.Req.Query = strings.TrimSpace(req.Req.Query)
isQuery := len(req.Req.Query) != 0
if !isQuery && !isMutation {
span.Annotate(nil, "empty request")
return nil, errors.Errorf("empty request")
}

span.AddAttributes(otrace.StringAttribute("Query", req.req.Query))
span.Annotatef(nil, "Request received: %v", req.req)
span.AddAttributes(otrace.StringAttribute("Query", req.Req.Query))
span.Annotatef(nil, "Request received: %v", req.Req)
if isQuery {
ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1))
defer func() {
Expand All @@ -1330,7 +1330,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
ostats.Record(ctx, x.NumMutations.M(1))
}

if req.doAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) {
if req.DoAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) {
// Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is
// needed by live loader.
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
Expand All @@ -1341,17 +1341,17 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
}

qc := &queryContext{
req: req.req,
req: req.Req,
latency: l,
span: span,
graphql: isGraphQL,
gqlField: req.gqlField,
gqlField: req.GqlField,
}
if rerr = parseRequest(ctx, qc); rerr != nil {
return
}

if req.doAuth == NeedAuthorize {
if req.DoAuth == NeedAuthorize {
if rerr = authorizeRequest(ctx, qc); rerr != nil {
return
}
Expand All @@ -1361,9 +1361,9 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
// assigned in the processQuery function called below.
defer annotateStartTs(qc.span, qc.req.StartTs)
// For mutations, we update the startTs if necessary.
if isMutation && req.req.StartTs == 0 {
if isMutation && req.Req.StartTs == 0 {
start := time.Now()
req.req.StartTs = worker.State.GetTimestamp(false)
req.Req.StartTs = worker.State.GetTimestamp(false)
qc.latency.AssignTimestamp = time.Since(start)
}
if x.WorkerConfig.AclEnabled {
Expand Down
28 changes: 24 additions & 4 deletions worker/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/dgraph-io/dgraph/v24/conn"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/schema"
"github.com/golang/glog"
)

func InitForLite(ps *badger.DB) {
Expand All @@ -30,18 +31,37 @@ func ApplyCommited(ctx context.Context, delta *pb.OracleDelta) error {
return groups().Node.commitOrAbort(1, delta)
}

func ApplyInitialSchema() error {
for _, su := range schema.InitialSchema(0) {
if err := updateSchema(su, 1); err != nil {
func ApplyInitialSchema(ns, ts uint64) error {
for _, su := range schema.InitialSchema(ns) {
if err := updateSchema(su, ts); err != nil {
return err
}
}
gr.applyInitialTypes()
applyInitialTypes(ns, ts)
return nil
}

func applyInitialTypes(ns, ts uint64) {
initialTypes := schema.InitialTypes(ns)
for _, t := range initialTypes {
if _, ok := schema.State().GetType(t.TypeName); ok {
continue
}
// It is okay to write initial types at ts=1.
if err := updateType(t.GetTypeName(), *t, ts); err != nil {
glog.Errorf("Error while applying initial type: %s", err)
}
}
}

func SetMaxUID(uid uint64) {
groups().Lock()
defer groups().Unlock()
groups().state.MaxUID = uid
}

func SetMaxNsID(nsId uint64) {
groups().Lock()
defer groups().Unlock()
groups().state.MaxNsID = nsId
}
Loading