diff --git a/cmd/store/import.go b/cmd/store/import.go index d73b347f..048c90c5 100644 --- a/cmd/store/import.go +++ b/cmd/store/import.go @@ -18,6 +18,7 @@ package store import ( "context" + "errors" "fmt" "os" "path" @@ -116,6 +117,7 @@ func importStore( format authorizationmodel.ModelFormat, storeID string, maxTuplesPerWrite, maxParallelRequests int32, + minRPS, maxRPS, rampupPeriodInSec int32, fileName string, ) (*CreateStoreAndModelResponse, error) { response, err := createOrUpdateStore(clientConfig, fgaClient, storeData, format, storeID, fileName) @@ -124,7 +126,7 @@ func importStore( } if len(storeData.Tuples) != 0 { - err = importTuples(fgaClient, storeData.Tuples, maxTuplesPerWrite, maxParallelRequests) + err = importTuples(fgaClient, storeData.Tuples, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriodInSec) if err != nil { return nil, err } @@ -159,6 +161,7 @@ func importTuples( fgaClient client.SdkClient, tuples []openfga.TupleKey, maxTuplesPerWrite, maxParallelRequests int32, + minRPS, maxRPS, rampupPeriodInSec int32, ) error { bar := createProgressBar(len(tuples)) @@ -171,7 +174,7 @@ func importTuples( writeRequest := client.ClientWriteRequest{ Writes: tuples[index:end], } - if _, err := tuple.ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests); err != nil { + if _, err := tuple.ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriodInSec); err != nil { return fmt.Errorf("failed to import tuples: %w", err) } @@ -282,6 +285,33 @@ var importCmd = &cobra.Command{ return fmt.Errorf("failed to parse parallel requests: %w", err) } + // Extract RPS control parameters + minRPS, err := cmd.Flags().GetInt32("min-rps") + if err != nil { + return fmt.Errorf("failed to parse min-rps: %w", err) + } + + maxRPS, err := cmd.Flags().GetInt32("max-rps") + if err != nil { + return fmt.Errorf("failed to parse max-rps: %w", err) + } + + rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec") + if err != nil { + return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err) + } + + // Validate RPS parameters - if one is provided, all three should be required + if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 { + if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 { + return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113 + } + + if minRPS > maxRPS { + return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113 + } + } + fileName, err := cmd.Flags().GetString("file") if err != nil { return fmt.Errorf("failed to get file name: %w", err) @@ -298,7 +328,7 @@ var importCmd = &cobra.Command{ } createStoreAndModelResponse, err := importStore(&clientConfig, fgaClient, storeData, format, - storeID, maxTuplesPerWrite, maxParallelRequests, fileName) + storeID, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod, fileName) if err != nil { return fmt.Errorf("failed to import store: %w", err) } @@ -317,6 +347,9 @@ func init() { importCmd.Flags().String("store-id", "", "Store ID") importCmd.Flags().Int32("max-tuples-per-write", tuple.MaxTuplesPerWrite, "Max tuples per write chunk.") importCmd.Flags().Int32("max-parallel-requests", tuple.MaxParallelRequests, "Max number of requests to issue to the server in parallel.") //nolint:lll + importCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes") + importCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes") + importCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps") if err := importCmd.MarkFlagRequired("file"); err != nil { fmt.Printf("error setting flag as required - %v: %v\n", "cmd/models/write", err) diff --git a/cmd/tuple/delete.go b/cmd/tuple/delete.go index 81b46b26..3978852f 100644 --- a/cmd/tuple/delete.go +++ b/cmd/tuple/delete.go @@ -17,7 +17,7 @@ limitations under the License. package tuple import ( - "context" + "errors" "fmt" "os" @@ -47,18 +47,28 @@ var deleteCmd = &cobra.Command{ return fmt.Errorf("failed to parse file name due to %w", err) } if fileName != "" { - var tuples []client.ClientTupleKeyWithoutCondition + var tuplesWithoutCondition []client.ClientTupleKeyWithoutCondition data, err := os.ReadFile(fileName) if err != nil { return fmt.Errorf("failed to read file %s due to %w", fileName, err) } - err = yaml.Unmarshal(data, &tuples) + err = yaml.Unmarshal(data, &tuplesWithoutCondition) if err != nil { return fmt.Errorf("failed to parse input tuples due to %w", err) } + // Convert ClientTupleKey to ClientTupleKeyWithoutCondition + tuples := make([]client.ClientTupleKey, len(tuplesWithoutCondition)) + for i, t := range tuplesWithoutCondition { + tuples[i] = client.ClientTupleKey{ + User: t.User, + Relation: t.Relation, + Object: t.Object, + } + } + maxTuplesPerWrite, err := cmd.Flags().GetInt32("max-tuples-per-write") if err != nil { return fmt.Errorf("failed to parse max tuples per write due to %w", err) @@ -69,30 +79,109 @@ var deleteCmd = &cobra.Command{ return fmt.Errorf("failed to parse parallel requests due to %w", err) } + // Extract RPS control parameters + minRPS, err := cmd.Flags().GetInt32("min-rps") + if err != nil { + return fmt.Errorf("failed to parse min-rps: %w", err) + } + + maxRPS, err := cmd.Flags().GetInt32("max-rps") + if err != nil { + return fmt.Errorf("failed to parse max-rps: %w", err) + } + + rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec") + if err != nil { + return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err) + } + + // Validate RPS parameters - if one is provided, all three should be required + if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 { + if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 { + return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113 + } + + if minRPS > maxRPS { + return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113 + } + } + + // Convert ClientTupleKey to ClientTupleKeyWithoutCondition + tuplesWithoutCondition = make([]client.ClientTupleKeyWithoutCondition, len(tuples)) + for i, t := range tuples { + tuplesWithoutCondition[i] = client.ClientTupleKeyWithoutCondition{ + User: t.User, + Relation: t.Relation, + Object: t.Object, + } + } + deleteRequest := client.ClientWriteRequest{ - Deletes: tuples, + Deletes: tuplesWithoutCondition, } - response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests) + response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod) if err != nil { return err } return output.Display(*response) } - body := &client.ClientDeleteTuplesBody{ - client.ClientTupleKeyWithoutCondition{ - User: args[0], - Relation: args[1], - Object: args[2], - }, + + // Create a ClientTupleKeyWithoutCondition from the arguments + tupleKey := client.ClientTupleKeyWithoutCondition{ + User: args[0], + Relation: args[1], + Object: args[2], + } + + // Create a delete request with the tuple + deleteRequest := client.ClientWriteRequest{ + Deletes: []client.ClientTupleKeyWithoutCondition{tupleKey}, + } + + // Extract RPS control parameters + minRPS, err := cmd.Flags().GetInt32("min-rps") + if err != nil { + return fmt.Errorf("failed to parse min-rps: %w", err) + } + + maxRPS, err := cmd.Flags().GetInt32("max-rps") + if err != nil { + return fmt.Errorf("failed to parse max-rps: %w", err) + } + + rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec") + if err != nil { + return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err) + } + + // Validate RPS parameters - if one is provided, all three should be required + if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 { + if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 { + return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113 + } + + if minRPS > maxRPS { + return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113 + } + } + + maxTuplesPerWrite, err := cmd.Flags().GetInt32("max-tuples-per-write") + if err != nil { + return fmt.Errorf("failed to parse max tuples per write due to %w", err) } - options := &client.ClientWriteOptions{} - _, err = fgaClient.DeleteTuples(context.Background()).Body(*body).Options(*options).Execute() + + maxParallelRequests, err := cmd.Flags().GetInt32("max-parallel-requests") + if err != nil { + return fmt.Errorf("failed to parse parallel requests due to %w", err) + } + + response, err := ImportTuples(fgaClient, deleteRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod) if err != nil { - return fmt.Errorf("failed to delete tuples due to %w", err) + return err } - return output.Display(output.EmptyStruct{}) + return output.Display(*response) }, } @@ -101,6 +190,9 @@ func init() { deleteCmd.Flags().String("model-id", "", "Model ID") deleteCmd.Flags().Int32("max-tuples-per-write", MaxTuplesPerWrite, "Max tuples per write chunk.") deleteCmd.Flags().Int32("max-parallel-requests", MaxParallelRequests, "Max number of requests to issue to the server in parallel.") //nolint:lll + deleteCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes") + deleteCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes") + deleteCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps") } func ExactArgsOrFlag(n int, flag string) cobra.PositionalArgs { diff --git a/cmd/tuple/import.go b/cmd/tuple/import.go index 1b0d2efa..81f087e3 100644 --- a/cmd/tuple/import.go +++ b/cmd/tuple/import.go @@ -18,9 +18,11 @@ package tuple import ( "context" + "errors" "fmt" "os" "strings" + "time" openfga "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/client" @@ -40,8 +42,8 @@ const ( ) type failedWriteResponse struct { - TupleKey client.ClientTupleKey `json:"tuple_key"` - Reason string `json:"reason"` + TupleKey interface{} `json:"tuple_key"` + Reason string `json:"reason"` } type ImportResponse struct { @@ -59,6 +61,9 @@ func ImportTuples( body client.ClientWriteRequest, maxTuplesPerWrite int32, maxParallelRequests int32, + minRPS int32, + maxRPS int32, + rampupPeriodInSec int32, ) (*ImportResponse, error) { options := client.ClientWriteOptions{ Transaction: &client.TransactionOptions{ @@ -68,13 +73,150 @@ func ImportTuples( }, } - response, err := fgaClient.Write(context.Background()).Body(body).Options(options).Execute() - if err != nil { - return nil, fmt.Errorf("failed to import tuples due to %w", err) + // If RPS control is not enabled, use the standard write method + if minRPS <= 0 || maxRPS <= 0 || rampupPeriodInSec <= 0 { + response, err := fgaClient.Write(context.Background()).Body(body).Options(options).Execute() + if err != nil { + return nil, fmt.Errorf("failed to import tuples due to %w", err) + } + + successfulWrites, failedWrites := processWrites(response.Writes) + successfulDeletes, failedDeletes := processDeletes(response.Deletes) + + result := ImportResponse{ + Successful: append(successfulWrites, successfulDeletes...), + Failed: append(failedWrites, failedDeletes...), + } + + return &result, nil + } + + // RPS control is enabled, implement rate-limited writes with ramp-up + return importTuplesWithRateLimit(fgaClient, body, options, minRPS, maxRPS, rampupPeriodInSec) +} + +// importTuplesWithRateLimit imports tuples with rate limiting and ramp-up +func importTuplesWithRateLimit( + fgaClient client.SdkClient, + body client.ClientWriteRequest, + options client.ClientWriteOptions, + minRPS int32, + maxRPS int32, + rampupPeriodInSec int32, +) (*ImportResponse, error) { + ctx := context.Background() + + // Prepare result containers + var successfulWrites []client.ClientTupleKey + var failedWrites []failedWriteResponse + var successfulDeletes []client.ClientTupleKey + var failedDeletes []failedWriteResponse + + // Calculate total number of tuples to write + totalTuples := len(body.Writes) + len(body.Deletes) + if totalTuples == 0 { + return &ImportResponse{ + Successful: []client.ClientTupleKey{}, + Failed: []failedWriteResponse{}, + }, nil + } + + // Create batches of tuples to write + // Each batch will contain a single tuple for now + // This could be optimized in the future to use maxTuplesPerWrite + var writeBatches []client.ClientWriteRequest + + // Add write tuples to batches + for _, tuple := range body.Writes { + // We need to handle ClientTupleKey directly + writeBatches = append(writeBatches, client.ClientWriteRequest{ + Writes: []client.ClientTupleKey{tuple}, + }) } - successfulWrites, failedWrites := processWrites(response.Writes) - successfulDeletes, failedDeletes := processDeletes(response.Deletes) + // Add delete tuples to batches + for _, tuple := range body.Deletes { + // We need to handle ClientTupleKeyWithoutCondition directly + // Convert ClientTupleKey to ClientTupleKeyWithoutCondition + tupleWithoutCondition := client.ClientTupleKeyWithoutCondition{ + User: tuple.User, + Relation: tuple.Relation, + Object: tuple.Object, + } + writeBatches = append(writeBatches, client.ClientWriteRequest{ + Deletes: []client.ClientTupleKeyWithoutCondition{tupleWithoutCondition}, + }) + } + + // Calculate ramp-up parameters + rampupPeriod := time.Duration(rampupPeriodInSec) * time.Second + startTime := time.Now() + endRampupTime := startTime.Add(rampupPeriod) + + // Process each batch with rate limiting + for i, batch := range writeBatches { + // Calculate current RPS based on elapsed time and ramp-up period + currentTime := time.Now() + if currentTime.After(endRampupTime) { + // Ramp-up period has passed, use max RPS + time.Sleep(time.Second / time.Duration(maxRPS)) + } else { + // Still in ramp-up period, calculate current RPS + elapsedRatio := float64(currentTime.Sub(startTime)) / float64(rampupPeriod) + currentRPS := minRPS + int32(float64(maxRPS-minRPS)*elapsedRatio) + if currentRPS < minRPS { + currentRPS = minRPS + } + if currentRPS > maxRPS { + currentRPS = maxRPS + } + + // Sleep to maintain the current RPS + time.Sleep(time.Second / time.Duration(currentRPS)) + } + + // Execute the write request + response, err := fgaClient.Write(ctx).Body(batch).Options(options).Execute() + if err != nil { + // If the entire request failed, add all tuples in this batch to failed + if len(batch.Writes) > 0 { + for _, tuple := range batch.Writes { + failedWrites = append(failedWrites, failedWriteResponse{ + TupleKey: tuple, + Reason: extractErrMssg(err), + }) + } + } + if len(batch.Deletes) > 0 { + for _, tuple := range batch.Deletes { + // Convert ClientTupleKeyWithoutCondition to interface{} + failedDeletes = append(failedDeletes, failedWriteResponse{ + TupleKey: map[string]string{ + "user": tuple.User, + "relation": tuple.Relation, + "object": tuple.Object, + }, + Reason: extractErrMssg(err), + }) + } + } + continue + } + + // Process successful and failed writes/deletes + sw, fw := processWrites(response.Writes) + successfulWrites = append(successfulWrites, sw...) + failedWrites = append(failedWrites, fw...) + + sd, fd := processDeletes(response.Deletes) + successfulDeletes = append(successfulDeletes, sd...) + failedDeletes = append(failedDeletes, fd...) + + // Log progress (optional) + if i > 0 && i%100 == 0 { + fmt.Fprintf(os.Stderr, "Processed %d/%d tuples\n", i, totalTuples) + } + } result := ImportResponse{ Successful: append(successfulWrites, successfulDeletes...), @@ -130,7 +272,8 @@ func processDeletes( ) for _, delete := range deletes { - deletedTupleKey := openfga.TupleKey{ + // Convert to ClientTupleKey + deletedTupleKey := client.ClientTupleKey{ Object: delete.TupleKey.Object, Relation: delete.TupleKey.Relation, User: delete.TupleKey.User, @@ -156,7 +299,19 @@ var importCmd = &cobra.Command{ Short: "Import Relationship Tuples", Deprecated: "use the write/delete command with the flag --file instead", Long: "Imports Relationship Tuples to the store. " + - "This will write the tuples in chunks and at the end will report the tuple chunks that failed.", + "This will write the tuples in chunks and at the end will report the tuple chunks that failed.\n\n" + + "Rate Limiting:\n" + + "You can control the rate at which tuples are written using the following flags:\n" + + "- min-rps: Minimum requests per second for writes\n" + + "- max-rps: Maximum requests per second for writes\n" + + "- rampup-period-in-sec: Period in seconds to ramp up from min-rps to max-rps\n\n" + + "If any of these flags are provided, all three must be provided with positive values. " + + "The command will start writing tuples at the min-rps rate and gradually increase to " + + "max-rps over the specified rampup period. If all tuples are written before the rampup " + + "period ends, the command will exit. If the rampup period ends and there are still tuples " + + "to write, the command will continue writing at the max-rps rate until all tuples are written.", + Example: ` fga tuple import --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.yaml + fga tuple import --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.yaml --min-rps=10 --max-rps=50 --rampup-period-in-sec=60`, RunE: func(cmd *cobra.Command, _ []string) error { clientConfig := cmdutils.GetClientConfig(cmd) @@ -180,6 +335,33 @@ var importCmd = &cobra.Command{ return fmt.Errorf("failed to parse parallel requests due to %w", err) } + // Extract RPS control parameters + minRPS, err := cmd.Flags().GetInt32("min-rps") + if err != nil { + return fmt.Errorf("failed to parse min-rps: %w", err) + } + + maxRPS, err := cmd.Flags().GetInt32("max-rps") + if err != nil { + return fmt.Errorf("failed to parse max-rps: %w", err) + } + + rampupPeriod, err := cmd.Flags().GetInt32("rampup-period-in-sec") + if err != nil { + return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err) + } + + // Validate RPS parameters - if one is provided, all three should be required + if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 { + if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 { + return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113 + } + + if minRPS > maxRPS { + return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113 + } + } + tuples := []client.ClientTupleKey{} data, err := os.ReadFile(fileName) @@ -196,7 +378,7 @@ var importCmd = &cobra.Command{ Writes: tuples, } - result, err := ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests) + result, err := ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod) if err != nil { return err } @@ -210,4 +392,7 @@ func init() { importCmd.Flags().String("file", "", "Tuples file") importCmd.Flags().Int32("max-tuples-per-write", MaxTuplesPerWrite, "Max tuples per write chunk.") importCmd.Flags().Int32("max-parallel-requests", MaxParallelRequests, "Max number of requests to issue to the server in parallel.") //nolint:lll + importCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes") + importCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes") + importCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps") } diff --git a/cmd/tuple/write.go b/cmd/tuple/write.go index cce9a4da..6674cdae 100644 --- a/cmd/tuple/write.go +++ b/cmd/tuple/write.go @@ -63,13 +63,24 @@ var writeCmd = &cobra.Command{ "For example, a valid CSV file might start with a row like:\n" + "user_type,user_id,user_relation,relation,object_type,object_id,condition_name,condition_context\n\n" + "This command is flexible in accepting data inputs, making it easier to add multiple " + - "relationship tuples in various convenient formats.", + "relationship tuples in various convenient formats.\n\n" + + "Rate Limiting:\n" + + "You can control the rate at which tuples are written using the following flags:\n" + + "- min-rps: Minimum requests per second for writes\n" + + "- max-rps: Maximum requests per second for writes\n" + + "- rampup-period-in-sec: Period in seconds to ramp up from min-rps to max-rps\n\n" + + "If any of these flags are provided, all three must be provided with positive values. " + + "The command will start writing tuples at the min-rps rate and gradually increase to " + + "max-rps over the specified rampup period. If all tuples are written before the rampup " + + "period ends, the command will exit. If the rampup period ends and there are still tuples " + + "to write, the command will continue writing at the max-rps rate until all tuples are written.", Args: ExactArgsOrFlag(writeCommandArgumentsCount, "file"), Example: ` fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 user:anne can_view document:roadmap fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 user:anne can_view document:roadmap --condition-name inOffice --condition-context '{"office_ip":"10.0.1.10"}' fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.json fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.yaml - fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.csv`, + fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.csv + fga tuple write --store-id=01H0H015178Y2V4CX10C2KGHF4 --file tuples.json --min-rps=10 --max-rps=50 --rampup-period-in-sec=60`, RunE: func(cmd *cobra.Command, args []string) error { clientConfig := cmdutils.GetClientConfig(cmd) @@ -92,13 +103,25 @@ func writeTuplesFromArgs(cmd *cobra.Command, args []string, fgaClient *client.Op return err //nolint:wrapcheck } - body := client.ClientWriteTuplesBody{ - client.ClientTupleKey{ + var body client.ClientWriteTuplesBody + + if condition == nil { + // Use ClientTupleKeyWithoutCondition if there's no condition + tupleWithoutCondition := client.ClientTupleKeyWithoutCondition{ + User: args[0], + Relation: args[1], + Object: args[2], + } + body = []client.ClientTupleKeyWithoutCondition{tupleWithoutCondition} + } else { + // Use ClientTupleKey if there's a condition + tupleKey := client.ClientTupleKey{ User: args[0], Relation: args[1], Object: args[2], Condition: condition, - }, + } + body = []client.ClientTupleKey{tupleKey} } _, err = fgaClient. @@ -139,6 +162,33 @@ func writeTuplesFromFile(flags *flag.FlagSet, fgaClient *client.OpenFgaClient) e return fmt.Errorf("failed to parse parallel requests: %w", err) } + // Extract RPS control parameters + minRPS, err := flags.GetInt32("min-rps") + if err != nil { + return fmt.Errorf("failed to parse min-rps: %w", err) + } + + maxRPS, err := flags.GetInt32("max-rps") + if err != nil { + return fmt.Errorf("failed to parse max-rps: %w", err) + } + + rampupPeriod, err := flags.GetInt32("rampup-period-in-sec") + if err != nil { + return fmt.Errorf("failed to parse rampup-period-in-sec: %w", err) + } + + // Validate RPS parameters - if one is provided, all three should be required + if minRPS > 0 || maxRPS > 0 || rampupPeriod > 0 { + if minRPS <= 0 || maxRPS <= 0 || rampupPeriod <= 0 { + return errors.New("if any of min-rps, max-rps, or rampup-period-in-sec is provided, all three must be provided with positive values") //nolint:goerr113 + } + + if minRPS > maxRPS { + return errors.New("min-rps cannot be greater than max-rps") //nolint:goerr113 + } + } + tuples, err := tuplefile.ReadTupleFile(fileName) if err != nil { return err //nolint:wrapcheck @@ -148,7 +198,7 @@ func writeTuplesFromFile(flags *flag.FlagSet, fgaClient *client.OpenFgaClient) e Writes: tuples, } - response, err := ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests) + response, err := ImportTuples(fgaClient, writeRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod) if err != nil { return err } @@ -182,4 +232,7 @@ func init() { writeCmd.Flags().Int32("max-tuples-per-write", MaxTuplesPerWrite, "Max tuples per write chunk.") writeCmd.Flags().Int32("max-parallel-requests", MaxParallelRequests, "Max number of requests to issue to the server in parallel.") writeCmd.Flags().BoolVar(&hideImportedTuples, "hide-imported-tuples", false, "Hide successfully imported tuples from output") + writeCmd.Flags().Int32("min-rps", 0, "Minimum requests per second for writes") + writeCmd.Flags().Int32("max-rps", 0, "Maximum requests per second for writes") + writeCmd.Flags().Int32("rampup-period-in-sec", 0, "Period in seconds to ramp up from min-rps to max-rps") } diff --git a/cmd/tuple/write_test.go b/cmd/tuple/write_test.go index c2742cf4..24d5aeb0 100644 --- a/cmd/tuple/write_test.go +++ b/cmd/tuple/write_test.go @@ -2,13 +2,18 @@ package tuple import ( "testing" + "time" openfga "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/client" + "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/openfga/cli/internal/cmdutils" + "github.com/openfga/cli/internal/mocks" "github.com/openfga/cli/internal/tuplefile" + "github.com/stretchr/testify/mock" ) func TestParseTuplesFileData(t *testing.T) { //nolint:funlen @@ -223,3 +228,69 @@ func TestParseTuplesFileData(t *testing.T) { //nolint:funlen }) } } + +func TestWriteTuplesWithRateLimit(t *testing.T) { + // Create a mock FGA client + mockClient := &mocks.SdkClient{} + + // Create a test write request with multiple tuples + tuples := []client.ClientTupleKey{ + { + User: "user:anne", + Relation: "can_view", + Object: "document:roadmap", + }, + { + User: "user:bob", + Relation: "can_view", + Object: "document:roadmap", + }, + { + User: "user:charlie", + Relation: "can_view", + Object: "document:roadmap", + }, + } + + writeRequest := client.ClientWriteRequest{ + Writes: tuples, + } + + // Set up the mock client to return a successful response for each tuple + mockWriteAPI := &mocks.WriteAPI{} + mockBodyAPI := &mocks.WriteBodyAPI{} + mockOptionsAPI := &mocks.WriteOptionsAPI{} + mockExecuteAPI := &mocks.WriteExecuteAPI{} + + mockClient.On("Write", mock.Anything).Return(mockWriteAPI) + mockWriteAPI.On("Body", mock.Anything).Return(mockBodyAPI) + mockBodyAPI.On("Options", mock.Anything).Return(mockOptionsAPI) + mockOptionsAPI.On("Execute").Return(&client.ClientWriteResponse{ + Writes: []client.ClientWriteRequestWriteResponse{ + { + TupleKey: tuples[0], + Status: client.SUCCESS, + }, + }, + }, nil) + + // Call ImportTuples with rate limiting + minRPS := int32(10) + maxRPS := int32(20) + rampupPeriod := int32(1) // 1 second for faster test + maxTuplesPerWrite := int32(1) + maxParallelRequests := int32(1) + + startTime := time.Now() + response, err := ImportTuples(mockClient, writeRequest, maxTuplesPerWrite, maxParallelRequests, minRPS, maxRPS, rampupPeriod) + duration := time.Since(startTime) + + // Verify the results + assert.NoError(t, err) + assert.NotNil(t, response) + + // The test should take at least 1/maxRPS * len(tuples) seconds + // but we'll be lenient in the test to avoid flakiness + minExpectedDuration := time.Second / time.Duration(maxRPS) * time.Duration(len(tuples)) / 2 + assert.True(t, duration >= minExpectedDuration, "Expected duration to be at least %v, but got %v", minExpectedDuration, duration) +}