Snowpipe is the right answer when files land in S3 continuously and you want them in Snowflake without running a scheduled job. The mechanism is straightforward: S3 fires an event to an SQS queue when a file arrives, Snowpipe picks it up and runs the COPY. Near-real-time ingestion, nothing to schedule, each file loaded exactly once. Five pieces need to be in place: an IAM role, a storage integration, a stage, a pipe, and an S3 event notification pointing at the SQS queue Snowflake creates for you.
1. Create the IAM role
Snowflake reads from S3 using an IAM role in your account. Create it with S3 read permissions. Leave the trust policy empty for now, you get the values to fill it in from step 2.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::YOUR_BUCKET",
"arn:aws:s3:::YOUR_BUCKET/*"
]
}
]
}
2. Create the storage integration
The storage integration is how Snowflake talks to S3 without hardcoding credentials. STORAGE_ALLOWED_LOCATIONS limits which prefixes this integration can reach, so keep it as specific as you can. Run the DESC immediately after, because the output contains two values you need for the trust policy.
CREATE OR REPLACE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::ACCOUNT_ID:role/YOUR_ROLE'
STORAGE_ALLOWED_LOCATIONS = ('s3://YOUR_BUCKET/YOUR_PREFIX/');
DESC INTEGRATION s3_int;
From the output, copy STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID.
3. Update the IAM trust policy
Add this trust relationship to the role from step 1, using the two values from step 2. The external ID stops the confused deputy attack. Snowflake will refuse to assume the role without it, and the error message is not helpful.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "STORAGE_AWS_IAM_USER_ARN"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "STORAGE_AWS_EXTERNAL_ID"
}
}
}
]
}
4. Create the file format and stage
The stage points to the S3 prefix where files land. The file format tells Snowflake how to parse them. This example uses CSV but you can define a Parquet or JSON format just the same. Run LIST @s3_stage after to confirm Snowflake can see the files already there.
CREATE OR REPLACE FILE FORMAT csv_fmt
TYPE = CSV
SKIP_HEADER = 1
FIELD_DELIMITER = ','
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
NULL_IF = ('', 'NULL')
EMPTY_FIELD_AS_NULL = TRUE;
CREATE OR REPLACE STAGE s3_stage
URL = 's3://YOUR_BUCKET/YOUR_PREFIX/'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = csv_fmt;
LIST @s3_stage;
5. Create the target table
Snowpipe does not infer schema, so the table needs to exist before the pipe runs. Column order must match the COPY statement you write in step 6.
CREATE OR REPLACE TABLE raw.events (
event_id STRING,
event_ts TIMESTAMP_NTZ,
user_id STRING,
event_type STRING,
properties VARIANT
);
6. Create the pipe
AUTO_INGEST = TRUE is the flag that makes Snowpipe listen for S3 events instead of waiting for a manual trigger. The COPY loads everything under the stage prefix. After creating the pipe, run SHOW PIPES and copy the notification_channel value, that is the SQS ARN you need in the next step.
Snowpipe tracks every file it has loaded. If you drop the same filename twice it will not reload it, which is useful to know when testing.
CREATE OR REPLACE PIPE raw.events_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw.events (event_id, event_ts, user_id, event_type, properties)
FROM (
SELECT $1, $2, $3, $4, PARSE_JSON($5)
FROM @s3_stage
);
SHOW PIPES LIKE 'events_pipe';
7. Configure the S3 event notification
In the AWS console, go to the bucket, open Properties, and create an event notification: event type s3:ObjectCreated:*, filter to your prefix, destination SQS queue, paste the notification_channel ARN from step 6. One notification per prefix, one SQS queue per pipe, they do not interfere with each other.
If you are loading from multiple prefixes into separate tables, repeat steps 4 through 7 for each. The storage integration can be shared.
8. Verify
Drop a test file into the prefix and check the pipe status. SYSTEM$PIPE_STATUS tells you if the pipe is running and how many SQS messages are pending. COPY_HISTORY tells you what actually landed in the table, including any files that were skipped and why.
SELECT SYSTEM$PIPE_STATUS('raw.events_pipe');
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'EVENTS',
START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
));
If pendingFileCount in the pipe status is not going down, the event notification is misconfigured. If files appear in COPY_HISTORY with a status of LOAD_FAILED, the error column tells you what is wrong with the data.
The pipe is running.
S3 sends the event.
Snowflake loads the file.
Full script
-- Step 2: storage integration
CREATE OR REPLACE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::ACCOUNT_ID:role/YOUR_ROLE'
STORAGE_ALLOWED_LOCATIONS = ('s3://YOUR_BUCKET/YOUR_PREFIX/');
DESC INTEGRATION s3_int;
– Copy STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
– Update IAM trust policy before continuing
– Step 4: file format and stage
CREATE OR REPLACE FILE FORMAT csv_fmt
TYPE = CSV
SKIP_HEADER = 1
FIELD_DELIMITER = ‘,’
FIELD_OPTIONALLY_ENCLOSED_BY = ‘"’
NULL_IF = (’’, ‘NULL’)
EMPTY_FIELD_AS_NULL = TRUE;
CREATE OR REPLACE STAGE s3_stage
URL = ‘s3://YOUR_BUCKET/YOUR_PREFIX/’
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = csv_fmt;
– Step 5: target table
CREATE OR REPLACE TABLE raw.events (
event_id STRING,
event_ts TIMESTAMP_NTZ,
user_id STRING,
event_type STRING,
properties VARIANT
);
– Step 6: pipe
CREATE OR REPLACE PIPE raw.events_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw.events (event_id, event_ts, user_id, event_type, properties)
FROM (
SELECT $1, $2, $3, $4, PARSE_JSON($5)
FROM @s3_stage
);
SHOW PIPES LIKE ’events_pipe’;
– Copy notification_channel ARN and configure S3 event notification in AWS console
– Step 8: verify
SELECT SYSTEM$PIPE_STATUS(‘raw.events_pipe’);
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => ‘EVENTS’,
START_TIME => DATEADD(‘hour’, -1, CURRENT_TIMESTAMP())
));