Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPS control with ramp-up for tuple writes #464

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions cmd/store/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"context"
"errors"
"fmt"
"os"
"path"
Expand All @@ -31,7 +32,7 @@
"github.com/spf13/cobra"

"github.com/openfga/cli/cmd/model"
"github.com/openfga/cli/cmd/tuple"

Check failure on line 35 in cmd/store/import.go

View workflow job for this annotation

GitHub Actions / Lints

could not import github.com/openfga/cli/cmd/tuple (-: # github.com/openfga/cli/cmd/tuple
"github.com/openfga/cli/internal/authorizationmodel"
"github.com/openfga/cli/internal/cmdutils"
"github.com/openfga/cli/internal/fga"
Expand Down Expand Up @@ -116,6 +117,7 @@
format authorizationmodel.ModelFormat,
storeID string,
maxTuplesPerWrite, maxParallelRequests int32,
minRPS, maxRPS, rampupPeriodInSec int32,
fileName string,
) (*CreateStoreAndModelResponse, error) {
response, err := createOrUpdateStore(clientConfig, fgaClient, storeData, format, storeID, fileName)
Expand All @@ -124,7 +126,7 @@
}

if len(storeData.Tuples) != 0 {
err = importTuples(fgaClient, storeData.Tuples, maxTuplesPerWrite, maxParallelRequests)
err = importTuples(fgaClient, storeData.Tuples, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriodInSec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,6 +161,7 @@
fgaClient client.SdkClient,
tuples []openfga.TupleKey,
maxTuplesPerWrite, maxParallelRequests int32,
minRPS, maxRPS, rampupPeriodInSec int32,
) error {
bar := createProgressBar(len(tuples))

Expand All @@ -171,7 +174,7 @@
writeRequest := client.ClientWriteRequest{
Writes: tuples[index:end],
}
if _, err := tuple.ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests); err != nil {
if _, err := tuple.ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriodInSec); err != nil {
return fmt.Errorf("failed to import tuples: %w", err)
}

Expand Down Expand Up @@ -282,6 +285,33 @@
return fmt.Errorf("failed to parse parallel requests: %w", err)
}

// Extract RPS control parameters
minRPS, err := cmd.Flags().GetInt32("min-rps")
if err != nil {
return fmt.Errorf("failed to parse min-rps: %w", err)
}

maxRPS, err := cmd.Flags().GetInt32("max-rps")
if err != nil {
return fmt.Errorf("failed to parse max-rps: %w", err)
}

rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec")
if err != nil {
return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err)
}

// Validate RPS parameters - if one is provided, all three should be required
if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 {
if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 {
return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113
}

if minRPS > maxRPS {
return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113
}
}

fileName, err := cmd.Flags().GetString("file")
if err != nil {
return fmt.Errorf("failed to get file name: %w", err)
Expand All @@ -298,7 +328,7 @@
}

createStoreAndModelResponse, err := importStore(&clientConfig, fgaClient, storeData, format,
storeID, maxTuplesPerWrite, maxParallelRequests, fileName)
storeID, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod, fileName)
if err != nil {
return fmt.Errorf("failed to import store: %w", err)
}
Expand All @@ -317,6 +347,9 @@
importCmd.Flags().String("store-id", "", "Store ID")
importCmd.Flags().Int32("max-tuples-per-write", tuple.MaxTuplesPerWrite, "Max tuples per write chunk.")
importCmd.Flags().Int32("max-parallel-requests", tuple.MaxParallelRequests, "Max number of requests to issue to the server in parallel.") //nolint:lll
importCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes")
importCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes")
importCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps")

if err := importCmd.MarkFlagRequired("file"); err != nil {
fmt.Printf("error setting flag as required - %v: %v\n", "cmd/models/write", err)
Expand Down
122 changes: 107 additions & 15 deletions cmd/tuple/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package tuple

import (
"context"
"errors"
"fmt"
"os"

Expand Down Expand Up @@ -47,18 +47,28 @@ var deleteCmd = &cobra.Command{
return fmt.Errorf("failed to parse file name due to %w", err)
}
if fileName != "" {
var tuples []client.ClientTupleKeyWithoutCondition
var tuplesWithoutCondition []client.ClientTupleKeyWithoutCondition

data, err := os.ReadFile(fileName)
if err != nil {
return fmt.Errorf("failed to read file %s due to %w", fileName, err)
}

err = yaml.Unmarshal(data, &tuples)
err = yaml.Unmarshal(data, &tuplesWithoutCondition)
if err != nil {
return fmt.Errorf("failed to parse input tuples due to %w", err)
}

// Convert ClientTupleKey to ClientTupleKeyWithoutCondition
tuples := make([]client.ClientTupleKey, len(tuplesWithoutCondition))
for i, t := range tuplesWithoutCondition {
tuples[i] = client.ClientTupleKey{
User: t.User,
Relation: t.Relation,
Object: t.Object,
}
}

maxTuplesPerWrite, err := cmd.Flags().GetInt32("max-tuples-per-write")
if err != nil {
return fmt.Errorf("failed to parse max tuples per write due to %w", err)
Expand All @@ -69,30 +79,109 @@ var deleteCmd = &cobra.Command{
return fmt.Errorf("failed to parse parallel requests due to %w", err)
}

// Extract RPS control parameters
minRPS, err := cmd.Flags().GetInt32("min-rps")
if err != nil {
return fmt.Errorf("failed to parse min-rps: %w", err)
}

maxRPS, err := cmd.Flags().GetInt32("max-rps")
if err != nil {
return fmt.Errorf("failed to parse max-rps: %w", err)
}

rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec")
if err != nil {
return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err)
}

// Validate RPS parameters - if one is provided, all three should be required
if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 {
if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 {
return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113
}

if minRPS > maxRPS {
return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113
}
}

// Convert ClientTupleKey to ClientTupleKeyWithoutCondition
tuplesWithoutCondition = make([]client.ClientTupleKeyWithoutCondition, len(tuples))
for i, t := range tuples {
tuplesWithoutCondition[i] = client.ClientTupleKeyWithoutCondition{
User: t.User,
Relation: t.Relation,
Object: t.Object,
}
}

deleteRequest := client.ClientWriteRequest{
Deletes: tuples,
Deletes: tuplesWithoutCondition,
}
response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests)
response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod)
if err != nil {
return err
}

return output.Display(*response)
}
body := &client.ClientDeleteTuplesBody{
client.ClientTupleKeyWithoutCondition{
User: args[0],
Relation: args[1],
Object: args[2],
},

// Create a ClientTupleKeyWithoutCondition from the arguments
tupleKey := client.ClientTupleKeyWithoutCondition{
User: args[0],
Relation: args[1],
Object: args[2],
}

// Create a delete request with the tuple
deleteRequest := client.ClientWriteRequest{
Deletes: []client.ClientTupleKeyWithoutCondition{tupleKey},
}

// Extract RPS control parameters
minRPS, err := cmd.Flags().GetInt32("min-rps")
if err != nil {
return fmt.Errorf("failed to parse min-rps: %w", err)
}

maxRPS, err := cmd.Flags().GetInt32("max-rps")
if err != nil {
return fmt.Errorf("failed to parse max-rps: %w", err)
}

rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec")
if err != nil {
return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err)
}

// Validate RPS parameters - if one is provided, all three should be required
if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 {
if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 {
return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113
}

if minRPS > maxRPS {
return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113
}
}

maxTuplesPerWrite, err := cmd.Flags().GetInt32("max-tuples-per-write")
if err != nil {
return fmt.Errorf("failed to parse max tuples per write due to %w", err)
}
options := &client.ClientWriteOptions{}
_, err = fgaClient.DeleteTuples(context.Background()).Body(*body).Options(*options).Execute()

maxParallelRequests, err := cmd.Flags().GetInt32("max-parallel-requests")
if err != nil {
return fmt.Errorf("failed to parse parallel requests due to %w", err)
}

response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod)
if err != nil {
return fmt.Errorf("failed to delete tuples due to %w", err)
return err
}

return output.Display(output.EmptyStruct{})
return output.Display(*response)
},
}

Expand All @@ -101,6 +190,9 @@ func init() {
deleteCmd.Flags().String("model-id", "", "Model ID")
deleteCmd.Flags().Int32("max-tuples-per-write", MaxTuplesPerWrite, "Max tuples per write chunk.")
deleteCmd.Flags().Int32("max-parallel-requests", MaxParallelRequests, "Max number of requests to issue to the server in parallel.") //nolint:lll
deleteCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes")
deleteCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes")
deleteCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps")
}

func ExactArgsOrFlag(n int, flag string) cobra.PositionalArgs {
Expand Down
Loading
Loading