Skip to content

Commit

Permalink
Merge pull request #3208 from rockwotj/snowflake-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Feb 25, 2025
2 parents 6af4030 + fbd006d commit a078abc
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- Metadata field `kafka_is_high_watermark` added to the `redpanda_migrator_offsets` input. (@mihaitodor)
- Input `postgres_cdc` now emits logical messages to the WAL every hour by default to allow WAL reclaiming for low frequency tables, this frequency is controlled by field `heartbeat_interval`. (@rockwotj)
- Output `snowflake_streaming` now has a `commit_timeout` field to control how long to wait for a commit in Snowflake. (@rockwotj)
- Output `snowflake_streaming` now has a `url` field to override the hostname for connections to Snowflake, which is required for private link deployments. (@rockwotj)

### Fixed

Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ output:
label: ""
snowflake_streaming:
account: ORG-ACCOUNT # No default (required)
url: https://org-account.privatelink.snowflakecomputing.com # No default (optional)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: MY_DATABASE # No default (required)
Expand Down Expand Up @@ -302,6 +303,20 @@ The Snowflake https://docs.snowflake.com/en/user-guide/admin-account-identifier.
account: ORG-ACCOUNT
```
=== `url`
Override the default URL used to connect to Snowflake which is https://ORG-ACCOUNT.snowflakecomputing.com
*Type*: `string`
```yml
# Examples
url: https://org-account.privatelink.snowflakecomputing.com
```
=== `user`
The user to run the Snowpipe Stream as. See https://docs.snowflake.com/en/user-guide/admin-user-management[Snowflake Documentation^] on how to create a user.
Expand Down
27 changes: 26 additions & 1 deletion internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/binary"
"errors"
"fmt"
neturl "net/url"
"strings"
"sync"
"time"
Expand All @@ -29,6 +30,7 @@ import (

const (
ssoFieldAccount = "account"
ssoFieldURL = "url"
ssoFieldUser = "user"
ssoFieldRole = "role"
ssoFieldDB = "database"
Expand Down Expand Up @@ -98,6 +100,8 @@ You can monitor the output batch size using the `+"`snowflake_compressed_output_
service.NewStringField(ssoFieldAccount).
Description(`The Snowflake https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account name^]. Which should be formatted as `+"`<orgname>-<account_name>`"+` where `+"`<orgname>`"+` is the name of your Snowflake organization and `+"`<account_name>`"+` is the unique name of your account within your organization.
`).Example("ORG-ACCOUNT"),
service.NewStringField(ssoFieldURL).
Description("Override the default URL used to connect to Snowflake which is https://ORG-ACCOUNT.snowflakecomputing.com").Optional().Example("https://org-account.privatelink.snowflakecomputing.com").Advanced(),
service.NewStringField(ssoFieldUser).Description("The user to run the Snowpipe Stream as. See https://docs.snowflake.com/en/user-guide/admin-user-management[Snowflake Documentation^] on how to create a user."),
service.NewStringField(ssoFieldRole).Description("The role for the `user` field. The role must have the https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#required-access-privileges[required privileges^] to call the Snowpipe Streaming APIs. See https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles[Snowflake Documentation^] for more information about roles.").Example("ACCOUNTADMIN"),
service.NewStringField(ssoFieldDB).Description("The Snowflake database to ingest data into.").Example("MY_DATABASE"),
Expand Down Expand Up @@ -386,6 +390,19 @@ func newSnowflakeStreamer(
if err != nil {
return nil, err
}
var url string
if conf.Contains(ssoFieldURL) {
url, err = conf.FieldString(ssoFieldAccount)
if err != nil {
return nil, err
}
_, err := neturl.Parse(url)
if err != nil {
return nil, fmt.Errorf("invalid url: %w", err)
}
} else {
url = fmt.Sprintf("https://%s.snowflakecomputing.com", account)
}
user, err := conf.FieldString(ssoFieldUser)
if err != nil {
return nil, err
Expand Down Expand Up @@ -521,14 +538,22 @@ func newSnowflakeStreamer(
return err
}
}
restClient, err := streaming.NewRestClient(account, user, mgr.EngineVersion(), rsaKey, mgr.Logger())
restClient, err := streaming.NewRestClient(streaming.RestOptions{
Account: account,
URL: url,
User: user,
Version: mgr.EngineVersion(),
PrivateKey: rsaKey,
Logger: mgr.Logger(),
})
if err != nil {
return nil, fmt.Errorf("unable to create rest API client: %w", err)
}
client, err := streaming.NewSnowflakeServiceClient(
context.Background(),
streaming.ClientOptions{
Account: account,
URL: url,
User: user,
Role: role,
PrivateKey: rsaKey,
Expand Down
16 changes: 9 additions & 7 deletions internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,20 @@ func setup(t *testing.T) (*streaming.SnowflakeRestClient, *streaming.SnowflakeSe
require.NoError(t, err)
clientOptions := streaming.ClientOptions{
Account: envOr("SNOWFLAKE_ACCOUNT", "WQKFXQQ-WI77362"),
URL: fmt.Sprintf("https://%s.snowflakecomputing.com", envOr("SNOWFLAKE_ACCOUNT", "WQKFXQQ-WI77362")),
User: envOr("SNOWFLAKE_USER", "ROCKWOODREDPANDA"),
Role: "ACCOUNTADMIN",
PrivateKey: parseResult.(*rsa.PrivateKey),
ConnectVersion: "",
}
restClient, err := streaming.NewRestClient(
clientOptions.Account,
clientOptions.User,
clientOptions.ConnectVersion,
clientOptions.PrivateKey,
clientOptions.Logger,
)
restClient, err := streaming.NewRestClient(streaming.RestOptions{
Account: clientOptions.Account,
User: clientOptions.User,
URL: clientOptions.URL,
Version: clientOptions.ConnectVersion,
PrivateKey: clientOptions.PrivateKey,
Logger: clientOptions.Logger,
})
require.NoError(t, err)
t.Cleanup(restClient.Close)
streamClient, err := streaming.NewSnowflakeServiceClient(ctx, clientOptions)
Expand Down
38 changes: 25 additions & 13 deletions internal/impl/snowflake/streaming/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ type (
// SnowflakeRestClient allows you to make REST API calls against Snowflake APIs.
type SnowflakeRestClient struct {
account string
url string
user string
privateKey *rsa.PrivateKey
client *http.Client
Expand All @@ -315,9 +316,19 @@ type SnowflakeRestClient struct {
cachedJWT *typed.AtomicValue[string]
}

// RestOptions is the options to create a REST client.
type RestOptions struct {
Account string
User string
URL string
Version string
PrivateKey *rsa.PrivateKey
Logger *service.Logger
}

// NewRestClient creates a new REST client for the given parameters.
func NewRestClient(account, user, version string, privateKey *rsa.PrivateKey, logger *service.Logger) (c *SnowflakeRestClient, err error) {
version = strings.TrimLeft(version, "v")
func NewRestClient(opts RestOptions) (c *SnowflakeRestClient, err error) {
version := strings.TrimLeft(opts.Version, "v")
// Drop any -rc suffix, Snowflake doesn't like it
splits := strings.SplitN(version, "-", 2)
if len(splits) > 1 {
Expand All @@ -329,11 +340,12 @@ func NewRestClient(account, user, version string, privateKey *rsa.PrivateKey, lo
version = "99.0.0"
}
c = &SnowflakeRestClient{
account: account,
user: user,
account: opts.Account,
url: opts.URL,
user: opts.User,
client: http.DefaultClient,
privateKey: privateKey,
logger: logger,
privateKey: opts.PrivateKey,
logger: opts.Logger,
version: version,
cachedJWT: typed.NewAtomicValue(""),
authRefreshLoop: asyncroutine.NewPeriodic(
Expand All @@ -343,7 +355,7 @@ func NewRestClient(account, user, version string, privateKey *rsa.PrivateKey, lo
// We've already done this once, and there is no external component here
// so this should never fail, but log just in case...
if err != nil {
logger.Errorf("unable to mint JWT for snowflake output: %s", err)
c.logger.Errorf("unable to mint JWT for snowflake output: %s", err)
return
}
c.cachedJWT.Store(jwt)
Expand Down Expand Up @@ -386,42 +398,42 @@ func (c *SnowflakeRestClient) computeJWT() (string, error) {
// we don't have to handle async requests.
func (c *SnowflakeRestClient) RunSQL(ctx context.Context, req RunSQLRequest) (resp RunSQLResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/api/v2/statements?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/api/v2/statements?requestId=%s", c.url, requestID), req, &resp)
return
}

// configureClient configures a client for Snowpipe Streaming.
func (c *SnowflakeRestClient) configureClient(ctx context.Context, req clientConfigureRequest) (resp clientConfigureResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/client/configure?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/v1/streaming/client/configure?requestId=%s", c.url, requestID), req, &resp)
return
}

// channelStatus returns the status of a given channel
func (c *SnowflakeRestClient) channelStatus(ctx context.Context, req batchChannelStatusRequest) (resp batchChannelStatusResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/status?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/v1/streaming/channels/status?requestId=%s", c.url, requestID), req, &resp)
return
}

// openChannel opens a channel for writing
func (c *SnowflakeRestClient) openChannel(ctx context.Context, req openChannelRequest) (resp openChannelResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/open?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/v1/streaming/channels/open?requestId=%s", c.url, requestID), req, &resp)
return
}

// dropChannel drops a channel when it's no longer in use.
func (c *SnowflakeRestClient) dropChannel(ctx context.Context, req dropChannelRequest) (resp dropChannelResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/drop?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/v1/streaming/channels/drop?requestId=%s", c.url, requestID), req, &resp)
return
}

// registerBlob registers a blob in object storage to be ingested into Snowflake.
func (c *SnowflakeRestClient) registerBlob(ctx context.Context, req registerBlobRequest) (resp registerBlobResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/write/blobs?requestId=%s", c.account, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("%s/v1/streaming/channels/write/blobs?requestId=%s", c.url, requestID), req, &resp)
return
}

Expand Down
17 changes: 10 additions & 7 deletions internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const debug = false
type ClientOptions struct {
// Account name
Account string
// Account url
URL string
// username
User string
// Snowflake Role (i.e. ACCOUNTADMIN)
Expand Down Expand Up @@ -66,13 +68,14 @@ type SnowflakeServiceClient struct {

// NewSnowflakeServiceClient creates a new API client for the Snowpipe Streaming API
func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*SnowflakeServiceClient, error) {
client, err := NewRestClient(
opts.Account,
opts.User,
opts.ConnectVersion,
opts.PrivateKey,
opts.Logger,
)
client, err := NewRestClient(RestOptions{
Account: opts.Account,
URL: opts.URL,
User: opts.User,
Version: opts.ConnectVersion,
PrivateKey: opts.PrivateKey,
Logger: opts.Logger,
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a078abc

Please sign in to comment.