Skip to content
20 changes: 10 additions & 10 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ jobs:
run: go test -v ./integration_test/docker_test/docker_test.go -count 1
env:
ENTERPRISE_TOKEN: ${{ secrets.ENTERPRISE_TOKEN }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ secrets.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ vars.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_PASSWORD: ${{ secrets.DOCKER_REGISTRY_MIRROR_PASSWORD }}
DOCKER_REGISTRY_MIRROR: ${{ secrets.DOCKER_REGISTRY_MIRROR }}
DOCKER_REGISTRY_MIRROR: ${{ vars.DOCKER_REGISTRY_MIRROR }}
- name: oss
if: matrix.FEATURES == 'oss'
run: go test -v ./integration_test/docker_test/docker_test.go -count 1
env:
RSERVER_ENABLE_MULTITENANCY: false
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ secrets.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ vars.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_PASSWORD: ${{ secrets.DOCKER_REGISTRY_MIRROR_PASSWORD }}
DOCKER_REGISTRY_MIRROR: ${{ secrets.DOCKER_REGISTRY_MIRROR }}
DOCKER_REGISTRY_MIRROR: ${{ vars.DOCKER_REGISTRY_MIRROR }}
warehouse-integration:
name: Warehouse Integration
runs-on: ubuntu-latest
Expand Down Expand Up @@ -122,9 +122,9 @@ jobs:
SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_RBAC_INTEGRATION_TEST_CREDENTIALS }}
SNOWFLAKE_KEYPAIR_UNENCRYPTED_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_KEYPAIR_UNENCRYPTED_INTEGRATION_TEST_CREDENTIALS }}
RSERVER_FILE_MANAGER_USE_AWS_SDK_V2: "true"
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ secrets.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ vars.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_PASSWORD: ${{ secrets.DOCKER_REGISTRY_MIRROR_PASSWORD }}
DOCKER_REGISTRY_MIRROR: ${{ secrets.DOCKER_REGISTRY_MIRROR }}
DOCKER_REGISTRY_MIRROR: ${{ vars.DOCKER_REGISTRY_MIRROR }}
RACE_ENABLED: "true"
FORCE_RUN_INTEGRATION_TESTS: "true"
- name: Upload coverage report
Expand Down Expand Up @@ -155,9 +155,9 @@ jobs:
run: make test exclude="/rudder-server/(jobsdb|integration_test|processor|regulation-worker|router|services|suppression-backup-service|warehouse)"
env:
RSERVER_PROCESSOR_ENABLE_CONCURRENT_STORE: "true"
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ secrets.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ vars.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_PASSWORD: ${{ secrets.DOCKER_REGISTRY_MIRROR_PASSWORD }}
DOCKER_REGISTRY_MIRROR: ${{ secrets.DOCKER_REGISTRY_MIRROR }}
DOCKER_REGISTRY_MIRROR: ${{ vars.DOCKER_REGISTRY_MIRROR }}
FORCE_RUN_INTEGRATION_TESTS: "true"
- name: Upload coverage report
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -232,9 +232,9 @@ jobs:
SNOWPIPE_STREAMING_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWPIPE_STREAMING_KEYPAIR_ENCRYPTED_INTEGRATION_TEST_CREDENTIALS }}
SNOWFLAKE_PRIVILEGE_INTEGRATION_TEST_CREDENTIALS: ${{ secrets.SNOWFLAKE_PRIVILEGE_INTEGRATION_TEST_CREDENTIALS }}
RSERVER_PROCESSOR_ENABLE_CONCURRENT_STORE: "true"
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ secrets.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_USERNAME: ${{ vars.DOCKER_REGISTRY_MIRROR_USERNAME }}
DOCKER_REGISTRY_MIRROR_PASSWORD: ${{ secrets.DOCKER_REGISTRY_MIRROR_PASSWORD }}
DOCKER_REGISTRY_MIRROR: ${{ secrets.DOCKER_REGISTRY_MIRROR }}
DOCKER_REGISTRY_MIRROR: ${{ vars.DOCKER_REGISTRY_MIRROR }}
RACE_ENABLED: ${{ matrix.race || false }}
RSERVER_OAUTH_TEST_CREDENTIALS: ${{ secrets.RSERVER_OAUTH_TEST_CREDENTIALS }}
FORCE_RUN_INTEGRATION_TESTS: "true"
Expand Down
165 changes: 165 additions & 0 deletions router/batchrouter/asyncdestinationmanager/salesforce-bulk/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package salesforcebulk

import (
"fmt"

"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/jsonrs"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

func prepareAsyncJob(eventPayload []byte, jobID int64, defaultOperation string) (*common.AsyncJob, error) {
message, err := extractMessage(eventPayload)
if err != nil {
return nil, err
}

clonedMessage := cloneMessage(message)
metadata := map[string]interface{}{
"job_id": jobID,
}

normalizedExternalIDs := collectNormalizedExternalIDs(message)
if len(normalizedExternalIDs) > 0 {
metadata["externalId"] = normalizedExternalIDs
}

operation := determineOperation(normalizedExternalIDs, defaultOperation)
clonedMessage["rudderOperation"] = operation
Comment on lines +28 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve existing rudderOperation in transformed message

The new transform helper computes operation := determineOperation(...) and unconditionally assigns it to clonedMessage["rudderOperation"]. This overwrites any rudderOperation already present in the incoming payload. Previously the transformer forwarded the payload as‑is, so callers could explicitly send rudderOperation: "delete" (or any other supported value) and have it respected. With this change, a message that already specifies a non‑default operation will now be forcibly changed—e.g., a delete request containing an externalId will be transformed into an upsert job, causing records to be inserted or updated instead of deleted. Unless the intention is to ignore all user‑provided operations, this introduces incorrect behavior for any event that explicitly sets rudderOperation.

Useful? React with 👍 / 👎.


if field, value, ok := identifierColumn(normalizedExternalIDs); ok {
clonedMessage[field] = value
}

return &common.AsyncJob{
Message: clonedMessage,
Metadata: metadata,
}, nil
}

func extractMessage(eventPayload []byte) (map[string]interface{}, error) {
body := gjson.GetBytes(eventPayload, "body.JSON")
if !body.Exists() || len(body.Raw) == 0 {
return map[string]interface{}{}, nil
}

var message map[string]interface{}
if err := jsonrs.Unmarshal([]byte(body.Raw), &message); err != nil {
return nil, err
}

if message == nil {
message = map[string]interface{}{}
}

return message, nil
}

func cloneMessage(message map[string]interface{}) map[string]interface{} {
cloned := make(map[string]interface{}, len(message))
for key, value := range message {
cloned[key] = value
}
return cloned
}

func collectNormalizedExternalIDs(message map[string]interface{}) []map[string]string {
var normalized []map[string]string

if contextRaw, ok := message["context"].(map[string]interface{}); ok {
normalized = append(normalized, normalizeExternalIDArray(contextRaw["externalId"])...)
}

normalized = append(normalized, normalizeExternalIDArray(message["externalId"])...)

Comment thread
etsenake marked this conversation as resolved.
return normalized
}

func normalizeExternalIDArray(raw interface{}) []map[string]string {
array, ok := raw.([]interface{})
if !ok || len(array) == 0 {
return nil
}

result := make([]map[string]string, 0, len(array))
for _, entry := range array {
if normalized, ok := normalizeExternalIDEntry(entry); ok {
result = append(result, normalized)
}
}

return result
}

func normalizeExternalIDEntry(raw interface{}) (map[string]string, bool) {
entry, ok := raw.(map[string]interface{})
if !ok {
return nil, false
}

normalized := map[string]string{
"type": "",
"id": "",
"identifierType": "",
}

if typeVal, ok := entry["type"]; ok {
normalized["type"] = fmt.Sprint(typeVal)
}

if idVal, ok := entry["id"]; ok {
normalized["id"] = fmt.Sprint(idVal)
}

if identifierVal, ok := entry["identifierType"]; ok {
normalized["identifierType"] = fmt.Sprint(identifierVal)
}

if normalized["identifierType"] == "" && normalized["id"] != "" {
normalized["identifierType"] = "Id"
}

return normalized, true
}

func determineOperation(externalIDs []map[string]string, defaultOperation string) string {
if defaultOperation == "" {
defaultOperation = "insert"
}

if len(externalIDs) == 0 {
return defaultOperation
}

for _, externalID := range externalIDs {
if externalID["id"] != "" {
return "upsert"
}
}

for _, externalID := range externalIDs {
if externalID["identifierType"] != "" {
return "upsert"
}
Comment thread
etsenake marked this conversation as resolved.
}

return defaultOperation
}

func identifierColumn(externalIDs []map[string]string) (string, string, bool) {
for _, externalID := range externalIDs {
if externalID["id"] == "" {
continue
}

field := externalID["identifierType"]
if field == "" {
field = "Id"
}

return field, externalID["id"], true
}

return "", "", false
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ import (
"os"
"strings"

"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/jsonrs"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

func (s *SalesforceBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(
gjson.GetBytes(job.EventPayload, "body.JSON").String(),
job.JobID,
)
asyncJob, err := prepareAsyncJob(job.EventPayload, job.JobID, s.config.Operation)
if err != nil {
return "", err
}

responsePayload, err := jsonrs.Marshal(asyncJob)
if err != nil {
return "", err
}

return string(responsePayload), nil
}

func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
Expand All @@ -37,16 +42,6 @@ func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestination
}
}

objectInfo, err := extractObjectInfo(input, s.config)
if err != nil {
return common.AsyncUploadOutput{
FailedJobIDs: append(failedJobIDs, importingJobIDs...),
FailedReason: fmt.Sprintf("Error extracting object info: %v", err),
FailedCount: len(failedJobIDs) + len(importingJobIDs),
DestinationID: destinationID,
}
}

jobsByOperation := groupJobsByOperation(input, s.config.Operation)

var allImportingJobIDs []int64
Expand All @@ -61,18 +56,51 @@ func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestination
remainingJobs := jobs

for len(remainingJobs) > 0 {
var (
currentObjectInfo *ObjectInfo
matchingJobs []common.AsyncJob
nonMatchingJobs []common.AsyncJob
)

for _, job := range remainingJobs {
info, err := extractObjectInfoFromJob(job, s.config)
if err != nil {
if jobID, ok := job.Metadata["job_id"].(float64); ok {
s.logger.Errorf("Error extracting object info for job %d: %v", int64(jobID), err)
allFailedJobIDs = append(allFailedJobIDs, int64(jobID))
} else {
s.logger.Errorf("Error extracting object info for job: %v", err)
}
continue
}

if currentObjectInfo == nil {
currentObjectInfo = info
}

if info.ObjectType == currentObjectInfo.ObjectType && info.ExternalIDField == currentObjectInfo.ExternalIDField {
matchingJobs = append(matchingJobs, job)
} else {
nonMatchingJobs = append(nonMatchingJobs, job)
}
}

if currentObjectInfo == nil || len(matchingJobs) == 0 {
break
}

s.hashMapMutex.Lock()
csvFilePath, csvHeaders, insertedJobIDs, overflowedJobs, err := createCSVFile(
destinationID,
remainingJobs,
matchingJobs,
s.dataHashToJobID,
operation,
)
s.hashMapMutex.Unlock()

if err != nil {
s.logger.Errorf("Error creating CSV for operation %s: %v", operation, err)
for _, job := range remainingJobs {
for _, job := range matchingJobs {
if jobID, ok := job.Metadata["job_id"].(float64); ok {
allFailedJobIDs = append(allFailedJobIDs, int64(jobID))
}
Expand All @@ -85,7 +113,7 @@ func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestination
s.logger.Debugf("Failed to remove empty CSV file %s: %v", csvFilePath, err)
}
s.logger.Errorf("No jobs fit in CSV for operation %s, marking as failed", operation)
for _, job := range remainingJobs {
for _, job := range matchingJobs {
if jobID, ok := job.Metadata["job_id"].(float64); ok {
allFailedJobIDs = append(allFailedJobIDs, int64(jobID))
}
Expand All @@ -97,9 +125,9 @@ func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestination
len(insertedJobIDs), operation, len(allImportingJobIDs)/100+1, len(jobs))

sfJobID, apiError := s.apiService.CreateJob(
objectInfo.ObjectType,
currentObjectInfo.ObjectType,
operation,
objectInfo.ExternalIDField,
currentObjectInfo.ExternalIDField,
)
if apiError != nil {
s.logger.Errorf("Error creating Salesforce job for operation %s: %v", operation, apiError)
Expand Down Expand Up @@ -144,7 +172,7 @@ func (s *SalesforceBulkUploader) Upload(asyncDestStruct *common.AsyncDestination
s.logger.Debugf("Failed to remove CSV file %s: %v", csvFilePath, err)
}

remainingJobs = overflowedJobs
remainingJobs = append(overflowedJobs, nonMatchingJobs...)
}
}

Expand Down
Loading
Loading