Snowflake: a cloud analytics solution
Snowflake is a cloud analytics solution that uses the cloud’s flexibility and cost benefits but does not compromise on providing the user with the database-like environment. It allows the user to use SQL to query data. Snowflake is a single, integrated platform delivered as-a-service. It features storage, compute, and global services layers that are physically separated but logically integrated. Data workloads scale independently from one another, making it an ideal platform for data warehousing, data lakes, data engineering, data science, modern data sharing, and developing data applications.
Getting Started
For almost all public cloud solutions, you need to be logged into the public cloud’s administrative portal to provision an instance of a database service. Snowflake takes a different approach as it has been designed to work on different cloud platforms. Snowflake offers a single and consistent entry point to its users, thus reducing the administrative overhead from them. The user is provided with unique URL that will always end with Snowflakecomputing.com.
Once the instance has been created, theoretically, you do not need to know about or access the underlying public cloud platform at all since the mapping is managed by Snowflake.
Creating a tailored multi-cluster virtual warehouse;
USE ROLE SYSADMIN;
CREATE WAREHOUSE ETL_WH
WAREHOUSE_SIZE = XSMALL
MAX_CLUSTER_COUNT = 3
MIN_CLUSTER_COUNT = 1
SCALING_POLICY = ECONOMY
AUTO_SUSPEND = 300 -- suspend after 5 minutes (300 seconds) of inactivity
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
COMMENT = 'Virtual Warehouse for ETL workloads. Auto scales between 1 and 3 clusters depending on the workload';
Setting MIN_CLUSTER_COUNT and MAX_CLUSTER_COUNT to different values ensures that the multi-cluster virtual warehouse will start with a number of clusters equal to MIN_CLUSTER_COUNT initially. However, it will scale up to MAX_CLUSTER_COUNT if the number of concurrent queries exceeds the server’s capacity and queries start to queue.
Setting SCALING_POLICY to ECONOMY (as opposed to STANDARD) ensures that the cluster is only scaled up if there are enough queries to keep the additional cluster busy for at least 6 minutes.
Furthermore, the AUTO_SUSPEND settings ensures that the cluster is suspended automatically after 300 seconds on inactivity.
The WAREHOUSE_SIZE parameter corresponds to the number of nodes in the virtual warehouse. Each node is compute unit.
However, more nodes does not always mean faster performance for all queries. Some queries will benefit from a larger virtual warehouse size, while certain queries will not.
Creating new Database and Table example;
CREATE DATABASE COOKBOOK;
USE DATABASE COOKBOOK;
CREATE TABLE MY_FIRST_TABLE
(
ID STRING,
NAME STRING
);
SELECT * FROM MY_FIRST_TABLE;
Using SnowSQL to connect to Snowflake
SnowSQL is the next-generation command line client for connecting to Snowflake to execute SQL queries and perform all DDL and DML operations, including loading data into and unloading data out of database tables.
You can find the URL that provides the latest version of the SnowSQL download by navigating to https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/index.html and browsing to find the version of SnowSQL that’s the most relevant for you.
Creating a new account admin user and understanding built-in roles
By default, a new Snowflake instance comes with a single user that has the account administrator (or ACCOUNTADMIN) role. The recommended best practice is to have at least two users with the ACCOUNTADMIN role and protect the users with multi-factor authentication (MFA).
USE ROLE SECURITYADMIN;
CREATE USER secondary_account_admin
PASSWORD = 'password123'
DEFAULT_ROLE = "ACCOUNTADMIN"
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE "ACCOUNTADMIN" TO USER secondary_account_admin;
To create a new user and grant them a role, you need a minimum of SECURITYADMIN privileges so that we can change our role to SECURITYADMIN. Then, we must use some standard SQL syntax to create a new user, specifying the default role for the new user as ACCOUNTADMIN. However, specifying a default role is not enough as it simply the default role for a user – it is required to grant roles to the user as well.
There five built-in system roles in Snowflake, and you can create new custom roles as well. These built-in roles are set up in a hierarchy. So, the ACCOUNTADMIN role automatically contains other roles.
If you create new custom roles, it is recommended that you grant those custom roles to the SYSTEMADMIN role since it is a best practice to complete the role hierarchy. Granting new custom roles to the SYSTEMADMIN role ensures that your system administrators have the required privileges to operate on all the objects in the system, regardless of who has created those objects.
Managing the Data Life Cycle
Snowflake is not very different from traditional databases and provides similar capabilities, but since Snowflake has been designed for the cloud from the ground up, it has small configurations that allow control over how data is managed in a database or a table and how temporary data is maintained and destroyed when not required. These capabilities are required when designing an ETL system or structuring data according to a data model.
Creating a new database
USE ROLE SYSADMIN;
CREATE DATABASE our_first_database
COMMENT = 'Our first database';
Verify the database has been created successfully
SHOW DATABASES LIKE 'our_first_database';
The query should return one row showing information about the newly created database, such as the database name, owner, comments, and retention time.
Creating database with time travel option
CREATE DATABASE production_database
DATA_RETENTION_TIME_IN_DAYS = 15
COMMENT = 'Critical production database';
SHOW DATABASES LIKE 'production_database';
Creating temporary database
While time travel is normally required for production databases, you wouldn’t normally need time travel and the fail-safe for temporary databases such as databases that are used in ETL processing. Removing time travel and fail-safe helps in reducing storage costs.
CREATE TRANSIENT DATABASE temporary_database
DATA_RETENTION_TIME_IN_DAYS = 0
COMMENT = 'Temporary database for ETL processing';
SHOW DATABASES LIKE 'temporary_database';
Alter database
ALTER DATABASE temporary_database
SET DATA_RETENTION_TIME_IN_DAYS = 1;
SHOW DATABASES LIKE 'temporary_database';
The default for time travel is 1 day. The database will also have a fail-safe enabled automatically. Both these options will cost you in storage, and in certain cases, you might want to reduce those storage costs. If such tables are not specifically configured, you will be unnecessarily incurring costs for the time travel and fail-safe that is stored with every data change that happens for those tables. We will set such databases to be transient (with TRANSIENT) so that the fail-safe option is not the default for the tables in that database. Setting this option does mean that such databases are not protected by fail-safe if a data loss event occurs, but for temporary databases and tables, this should not be an issue. Also, we have set time travel to be zero so that there is no time travel storage as well.
Note that, we have set database to have no time travel and no fail-safe, we can still set individual tables within the database to be protected by the fail-safe and time travel. Setting these options at the database level only changes the defaults for the objects created within that database.
Managing a schema
There can be multiple schemas in a database, but one schema belongs to a single database. Schemas help in grouping tables and views together that are logically related.
Checking current schemas in the database
SHOW SCHEMAS IN DATABASE testing_schema_creation;
Two schemas that are automatically available with every database.
Create a new schema
CREATE SCHEMA a_custom_schema
COMMENT = 'A new custom schema';
Verifying newly created schema
SHOW SCHEMAS LIKE 'a_custom_schema' IN DATABASE testing_schema_creation ;
Time travel and fail-safe options on schema creation
CREATE TRANSIENT SCHEMA temporary_data
DATA_RETENTION_TIME_IN_DAYS = 0
COMMENT = 'Schema containing temporary data used by ETL processes';
It is important to note that creating new schema sets the current schema of the session to the newly created schema. The implication of this behavior is that any subsequent DDL commands such as CREATE TABLE would create the table under the new schema. This is like issuing the USE SCHEMA command to change the current schema.
Managing tables
Table creation
CREATE TABLE customers (
id INT NOT NULL,
last_name VARCHAR(100) ,
first_name VARCHAR(100),
email VARCHAR(100),
company VARCHAR(100),
phone VARCHAR(100),
address1 VARCHAR(150),
address2 VARCHAR(150),
city VARCHAR(100),
state VARCHAR(100),
postal_code VARCHAR(15),
country VARCHAR(50)
);
Describe table
DESCRIBE TABLE customers;
Create or Replace Table
CREATE OR REPLACE TABLE customers (
id INT NOT NULL,
last_name VARCHAR(100) ,
first_name VARCHAR(100),
email VARCHAR(100),
company VARCHAR(100),
phone VARCHAR(100),
address1 STRING,
address2 STRING,
city VARCHAR(100),
state VARCHAR(100),
postal_code VARCHAR(15),
country VARCHAR(50)
);
Load sample data to the table
COPY INTO customers
FROM s3://xxxxxxx/customer.csv
FILE_FORMAT = (TYPE = csv SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"');
CTAS (Create table .. as select)
The dataset will be copied into new table that will have the same structure as the CUSTOMERS table. A SELECT QUERY statement on this new table copy the same data to the new table.
CREATE OR REPLACE TABLE
customers_deep_copy
AS
SELECT *
FROM customers;
In certain situations, a new table is required to be created with the same structure as an existing table, but the data is not to be copied.
CREATE OR REPLACE TABLE
customers_shallow_copy
LIKE customers;
Creating temporary table
CREATE TEMPORARY TABLE customers_temp AS SELECT * FROM customers WHERE TRY_TO_NUMBER(postal_code) IS NOT NULL;
CREATE TRANSIENT TABLE customers_trans AS SELECT * FROM customers WHERE TRY_TO_NUMBER(postal_code) IS NULL;
This SQL script will allow to create two tables, customers_temp and customers_trans. The two tables are not permanent, but the tables have limitations. If you end the session at this point, then the customer_temp table will not be recoverable after a re-login. Transient table are available after a session has been closed and will retain data in a subsequent session created by user login; however, they don’t consume fail-safe storage. This is an important mechanism for retaining data across sessions and can have applications in scenarios that require state management or in ETL jobs.
Managing external tables and stages
Creating a stage
A stage is a logical concept or an abstraction of a filesystem location that is external or internal to Snowflake. The following statement creates a stage named sfuser_ext_stage.
create or replace stage sfuser_ext_stage
url='s3://xxxxxxx/';
Listing on the stage command;
list@SFUSER_EXT_STAGE;
An external table is different from usual database tables because unlike tables that point to data inside a database, external tables provide a view on top of files stored in a stage. These are read-only tables that maintain metadata that’s helpful in interpreting contents of a file, which could be formatted as parquet, csv and so on.
Load external table with parquet file
create or replace external table ext_tbl_userdata1
with location = @sfuser_ext_stage
file_format = (type = parquet);
Load external table with csv file
create or replace external table ext_card_data
with location = @sfuser_ext_stage/csv
file_format = (type = csv)
pattern = '.*headless[.]csv';
External table ends up having data in JSON format at the end of a copy process. This step shows how some meaningful names can be given to the dummy or automatically created columns in the JSON document and how it can be flattened to generate column-oriented rows. Sample query to the external table;
select top 5 value:c3::float as card_sum,
value:c2::string as period
from ext_card_data;
Drop table
drop table ext_card_data;
drop table ext_tbl_userdata1;
Managing views in Snowflake
CREATE DATABASE test_view_creation;
CREATE VIEW test_view_creation.public.date_wise_orders
AS
SELECT L_COMMITDATE AS ORDER_DATE,
SUM(L_QUANTITY) AS TOT_QTY,
SUM(L_EXTENDEDPRICE) AS TOT_PRICE
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.LINEITEM
GROUP BY L_COMMITDATE;
Creating materialized view
CREATE MATERIALIZED VIEW test_view_creation.public.date_wise_orders_fast
AS
SELECT L_COMMITDATE AS ORDER_DATE,
SUM(L_QUANTITY) AS TOT_QTY,
SUM(L_EXTENDEDPRICE) AS TOT_PRICE
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.LINEITEM
GROUP BY L_COMMITDATE;
Loading and Extracting Data into and out of Snowflake
Configuring Snowflake access to private S3 buckets
First, log in to the AWS console and navigate to Identity & Access Management (IAM) under the Services dropdown. Within Account Settings under the Security Token Service (STS), locate the endpoint corresponding to the region where your Snowflake instance is located and ensure that the endpoint is activated.
Next, choose Policies from the left panel and create a new policy. Select the JSON tab on the Create policy page. If you want to give read-only access then remove Put and Delete actions from configuration. JSON document;
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion",
"s3:PutObject",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::<bucket>/*"
},
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<bucket>"
}
]
}
Click on Review Policy and then give the policy a name, for example, ETL_Access and save. Now click on Roles and select Create Role from resulting screen. Select Another AWS account when prompted to select a type. For the Account ID parameter, enter your account ID temporarily. Under Require External ID, enter 00000. We will modify the trusted relationship and replace these values.
Search for newly created policy in the Permissions step, check the checkbox against the policy and click next then give a name to the role.
Once the role is created, click on it, and copy the value shown in Role ARN. This value will be used to create the integration between Snowflake and the cloud storage.
Now log in to the Snowflake and create a cloud storage integration object. Note that your role must be ACCOUNTADMIN in order to create a storage integration object. Command;
CREATE STORAGE INTEGRATION S3_INTEGRATION
TYPE = EXTERNAL_STAGE
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<arn:aws:iam::123456789123:role/Role_For_Snowflake>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>');
STORAGE_PROVIDER = S3
Now run the describe statement. The output for the describe statement is shown in the following screenshot.
DESC INTEGRATION S3_INTEGRATION;
Return to the AWS console, select IAM, and click Roles from the left side menu. Select role that we created earlier. Click the Trust relationship tab and click edit trust relationship.
In the policy document, replace the text and change the relevant parameters then click Update Trust Policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
}
}
}
]
}
Go back to the Snowflake and grant the integration object to the SYSADMIN. This step must be run as ACCOUNTADMIN.
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON INTEGRATION S3_INTEGRATION TO ROLE SYSADMIN;
Creating stage for private S3 bucket:
USE ROLE SYSADMIN;
CREATE STAGE S3_RESTRICTED_STAGE
STORAGE_INTEGRATION = S3_INTEGRATION
URL = 's3://<bucket>';
To list the files:
LIST @S3_RESTRICTED_STAGE;
So this is it: you have successfully performed the configuration needed so that you can access your S3 bucket via Snowflake with proper authorization.
Loading delimited bulk data into Snowflake from cloud storage
CREATE DATABASE C3_R2;
USE C3_R2;
-- create the table into which sample data will be loaded
CREATE TABLE CREDIT_CARDS
(
CUSTOMER_NAME STRING,
CREDIT_CARD STRING,
TYPE STRING,
CCV INTEGER,
EXP_DATE STRING
);
Define the format of the file we want to load into the table. The file format describes a standard CSV file; therefore, we will be expecting a comma-separated file. Since the first row in the file is a header row, we will skip the header in our definition, and we will specify that the individual fields in the file may be enclosed by double quotes. We are also creating the file format as a named file format so that it can be reused multiple times:
-- define the CSV file format
CREATE FILE FORMAT GEN_CSV
TYPE = CSV
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"';
-- create an external stage using a sample S3 bucket
CREATE OR REPLACE STAGE C3_R2_STAGE url='s3://xxxxxxxx'
FILE_FORMAT = GEN_CSV;
-- list the files in the stage
LIST @C3_R2_STAGE;
-- copy the data from the stage into the table
COPY INTO CREDIT_CARDS
FROM @C3_R2_STAGE;
-- validate that the data loaded successfully
USE C3_R2;
SELECT COUNT(*) FROM CREDIT_CARDS;
There are two methods that you can use to load data from storage. The recommended method is to use the external stage method. The alternate method of loading from cloud storage is directly referencing the cloud storage in your COPY commands.
Loading delimited bulk data into Snowflake from your local machine
CREATE DATABASE C4_LD_EX;
-- create the table where the data will be loaded
CREATE TABLE CUSTOMER
(
FName STRING,
LName STRING,
Email STRING,
Date_Of_Birth DATE,
City STRING,
Country STRING
);
Define the format of the file that we want to load into the table. Here we are creating a named file format so that it can be reused multiple times. The file format describes a delimited file in which columns are delimited by pipe (|) character. It also describes that when a file is loaded using this file format, the header of the file is to be skipped:
-- define the file format which is pipe delimited in case of our sample file
CREATE FILE FORMAT PIPE_DELIM
TYPE = CSV
FIELD_DELIMITER = '|'
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1
DATE_FORMAT = 'YYYY-MM-DD';
USE C4_LD_EX;
-- Create an internal stage
CREATE STAGE CUSTOMER_STAGE
FILE_FORMAT = PIPE_DELIM;
Upload the file into the stage that we just created. This step must be run through SnowSQL from the machine where the file is located.
USE C4_LD_EX;
PUT file://customers.csv @CUSTOMER_STAGE;
-- validate that the file is successfully loaded
LIST @CUSTOMER_STAGE;
Now that the data is in a Snowflake internal stage, we can simply run the COPY command and load the data to the customer table.
USE C4_LD_EX;
-- copy from internal stage into the table
COPY INTO CUSTOMER
FROM @CUSTOMER_STAGE;
-- validate that the data is successfully loaded
USE C4_LD_EX;
SELECT * FROM CUSTOMER;
Once the data is loaded into the target table, it is generally not required to keep the data in the internal stage. It is good idea to clear the internal stage as data stored in an internal stage will also contribute toward the costs:
-- remove files from internal stage as they are already loaded
REMOVE @CUSTOMER_STAGE;
There are three types of staging that you can use for loading files from an on-promises system into a Snowflake instance. The three stages are Internal Named Stage, User Stage, and Table Stage. We used internal named stage method.
Before creating an internal stage, it is useful to identify the format and file structure of the files that will be landing in this stage. For that purpose, we created a file format that specifies that we are loading CSV format and has a header that we want to skip. Additionally, we specified the format of the date field in this file.
File is automatically compressed when it is moved to the internal stage, and it is automatically encrypted as well. Once the table is successfully loaded, it is useful to clear the stage so that you do not incur any storage costs related to the files that are present in internal stage.
Loading Parquet files into Snowflake
CREATE DATABASE C3_R4;
-- create the table in which we will load parquet data
CREATE TABLE TRANSACTIONS
(
TRANSACTION_DATE DATE,
CUSTOMER_ID NUMBER(38,0),
TRANSACTION_ID NUMBER(38,0),
AMOUNT NUMBER(38,0)
);
-- define the file format to be Parquet
CREATE FILE FORMAT GEN_PARQ
TYPE = PARQUET
COMPRESSION = AUTO
NULL_IF = ('MISSING','');
-- create external stage over public S3 bucket
CREATE OR REPLACE STAGE C3_R4_STAGE url='s3://xxxx’
FILE_FORMAT = GEN_PARQ;
-- try and list the files in stage
LIST @C3_R4_STAGE;
-- select the data in the stage
SELECT $1 FROM @C3_R4_STAGE;
We can now access the individual fields in the JSON through the syntax shown and load them into the table. The syntax used is $1:<fieldname>::<datatype_to_cast_to>:
-- use the special syntax to access fields in the data
-- convert them to the proper data type & insert into target table
INSERT INTO TRANSACTIONS
SELECT
$1:_COL_0::Date,
$1:_COL_1::NUMBER,
$1:_COL_2::NUMBER,
$1:_COL_3::NUMBER
FROM @C3_R4_STAGE;
-- Validate data is successfully loaded
USE C3_R4;
SELECT * FROM TRANSACTIONS;
When loading Parquet data, the data is loaded as JSON by Snowflake and is loaded as a VARIANT data type. Therefore, we use the same syntax that is reserved for processing into a single variant column, we can access the column using $1 syntax. To access individual fields, the syntax is $1:filedname and we can then cast the data into the desired data types by adding ::<desired_data_type> at the end.
Making sense of JSON semi-structured data and transforming to a relational view
CREATE DATABASE JSON_EX;
-- create external stage pointing to
-- the public bucket where we have palced a sample JSON file
CREATE OR REPLACE STAGE JSON_STG url='s3://xxxxxx'
FILE_FORMAT = (TYPE = JSON);
-- validate that you can access the bucket
LIST @JSON_STG;
-- check that you can load and parse the JSON
SELECT PARSE_JSON($1)
FROM @JSON_STG;
Let’s create a new table and load the JSON data into that table. Please note that the table has only one field called MY_JSON_DATA and the data type for that is VARIANT. Next, we run the COPY command and load the JSON data into this table:
-- create a new table in which we will load the JSON data
CREATE TABLE CREDIT_CARD_TEMP
(
MY_JSON_DATA VARIANT
);
-- copy the JSON data into the table
COPY INTO CREDIT_CARD_TEMP
FROM @JSON_STG;
-- parse and start making sense of JSON fields
SELECT MY_JSON_DATA:data_set,MY_JSON_DATA:extract_date FROM CREDIT_CARD_TEMP;
-- access the credit_cards array in JSON
SELECT MY_JSON_DATA:credit_cards FROM CREDIT_CARD_TEMP;
-- access specific values in credit_cards array in JSON
SELECT MY_JSON_DATA:credit_cards[0].CreditCardNo,MY_JSON_DATA:credit_cards[0].CreditCardHolder FROM CREDIT_CARD_TEMP;
The easiest method to convert that array into a relation view is to use the flatten function. We have given the credit_cards array as an input to the flatten function, which then transposes the JSON array data into rows.
-- use FLATTEN function to conver JSON into relational format
SELECT
MY_JSON_DATA:extract_date,
value:CreditCardNo::String,
value:CreditCardHolder::String,
value:CardPin::Integer,
value:CardCVV::String,
value:CardExpiry::String
FROM
CREDIT_CARD_TEMP
, lateral flatten( input => MY_JSON_DATA:credit_cards );
Processing newline delimited JSON (or NDJSON) into a Snowflake table
CREATE DATABASE NDJSON_EX;
-- create a stage pointing to the S3 bucket containing our example file
CREATE OR REPLACE STAGE NDJSON_STG url='s3://xxxxxx”
FILE_FORMAT = (TYPE = JSON, STRIP_OUTER_ARRAY = TRUE);
-- list & validate that you can see the json file
LIST @NDJSON_STG;
-- parse the JSON
SELECT PARSE_JSON($1)
FROM @NDJSON_STG;
-- parse and convert the JSON into relational format
SELECT PARSE_JSON($1):CreditCardNo::String AS CreditCardNo
,PARSE_JSON($1):CreditCardHolder::String AS CreditCardHolder
,PARSE_JSON($1):CardPin::Integer AS CardPin
,PARSE_JSON($1):CardExpiry::String AS CardExpiry
,PARSE_JSON($1):CardCVV::String AS CardCVV
FROM @NDJSON_STG;
-- create a new table with the JSON data
CREATE TABLE CREDIT_CARD_DATA AS
SELECT PARSE_JSON($1):CreditCardNo::String AS CreditCardNo
,PARSE_JSON($1):CreditCardHolder::String AS CreditCardHolder
,PARSE_JSON($1):CardPin::Integer AS CardPin
,PARSE_JSON($1):CardExpiry::String AS CardExpiry
,PARSE_JSON($1):CardCVV::String AS CardCVV
FROM @NDJSON_STG;
-- validate data inserted successfully
SELECT * FROM CREDIT_CARD_DATA;
Processing near real-time data into a Snowflake table using Snowpipe
CREATE DATABASE SP_EX;
--create the table where data will be loaded
CREATE TABLE TRANSACTIONS
(
Transaction_Date DATE,
Customer_ID NUMBER,
Transaction_ID NUMBER,
Amount NUMBER
);
-- create external stage
-- use an S3 integeration object for connecting to the bucket
CREATE OR REPLACE STAGE SP_TRX_STAGE
url='s3://<bucket>'
STORAGE_INTEGRATION = S3_INTEGRATION;
-- list the stage to validate everything works
LIST @SP_TRX_STAGE;
Snowpipe is using for enabling the streaming of data. The CREATE PIPE command makes use of the same COPY command. Notice that we have set AUTO_INGEST to true while creating the Snowpipe. Once we configure the events on AWS, the Snowpipe will automatically load files as they arrive in the bucket:
-- create a Snowpipe which will be used to load the data
CREATE OR REPLACE PIPE TX_LD_PIPE
AUTO_INGEST = true
AS COPY INTO TRANSACTIONS FROM @SP_TRX_STAGE
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);
Before we proceed to the AWS console, we need to perform one step. Run the SHOW PIPES command and copy the ARN value in the notification_channel field. We will use that ARN value to configure event notification in AWS:
-- Show pipe to see the notification channel
SHOW PIPES LIKE '%TX_LD_PIPE%';
Now proceed with the AWS console and click S3 bucket. Select Properties tab, then within the tab, click on Events. Click Add Notification on Events screen. Put in name for event then In the Events section, select All object create events, which basically means that event will be triggered every time a new object is created. In the Send to section, select SQS Queue, select Add SQS queue ARN, and paste the ARN.
There are two ways to trigger the Snowpipe and thus load the data present in the S3 bucket. You can either invoke the Snowpipe through a REST API call or you can rely on the notification sent by the cloud storage platforms to configure automatic triggering of the Snowpipe.
Extracting data from Snowflake
--create a database
CREATE DATABASE EXPORT_EX;
--Create an internal stage
CREATE OR REPLACE STAGE EXPORT_INTERNAL_STG
FILE_FORMAT = (TYPE = CSV COMPRESSION=GZIP);
Extracting data from a table into a stage is simple as running the COPY command, but the order of the copy is reversed; that is, we are now copying into the stage from the table.
--Extract data from a table into the internal stage
COPY INTO @EXPORT_INTERNAL_STG/customer.csv.gz
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER;
Snowflake automatically splits large data into multiple files.
--Validate files are extracted
LIST @EXPORT_INTERNAL_STG;
--Use SnowSQL to download the files to a local directory
GET @EXPORT_INTERNAL_STG 'file://C:/Downloads/';
Let’s extract the same table out to an external stage.
-- Create an External Stage
-- Use the storage integration you would have previously created
CREATE OR REPLACE STAGE EXPORT_EXTERNAL_STG
url='s3://<bucket>'
STORAGE_INTEGRATION = S3_INTEGRATION
FILE_FORMAT = (TYPE = PARQUET COMPRESSION=AUTO);;
--Extract data from a table into the external stage
COPY INTO @EXPORT_EXTERNAL_STG/customer.parquet
FROM (SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER SAMPLE (10));
Building Data Pipelines in Snowflake
Creating and scheduling a task
Scenario: We will create an aggregation query that we assume is being used in a report. We are assuming that the query takes a long time to run, therefore we are going to save results of the query to physical table and refresh it periodically through a scheduled task. (We will use the Snowflake sample data)
SELECT C.C_NAME,SUM(L_EXTENDEDPRICE),SUM(L_TAX)
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER C
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS O
ON O.O_CUSTKEY = C.C_CUSTKEY
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM LI
ON LI.L_ORDERKEY = O.O_ORDERKEY
GROUP BY C.C_NAME;
--create a target table for this query
CREATE DATABASE task_demo;
USE DATABASE task_demo;
CREATE TABLE ordering_customers
(
Reporting_Time TIMESTAMP,
Customer_Name STRING,
Revenue NUMBER(16,2),
Tax NUMBER(16,2)
);
--create a task using the preceding SQL statement
CREATE TASK refresh_ordering_customers
WAREHOUSE = COMPUTE_WH
SCHEDULE = '30 MINUTE'
COMMENT = 'Update Ordering_Customers Table with latest data'
AS
INSERT INTO ordering_customers
SELECT CURRENT_TIMESTAMP, C.C_NAME,
SUM(L_EXTENDEDPRICE), SUM(L_TAX)
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER C
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS O
ON O.O_CUSTKEY = C.C_CUSTKEY
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM LI
ON LI.L_ORDERKEY = O.O_ORDERKEY
GROUP BY CURRENT_TIMESTAMP, C.C_NAME;
--validate that the Task has been created correctly
DESC TASK refresh_ordering_customers;
Note that by default, a new task is crated in suspended state. If you are using a role other than ACCOUNTADMIN, you must grant that role the privilege to execute the task.
-- grant privileges to the SYSADMIN
USE ROLE ACCOUNTADMIN;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN;
We must now set the task status to Resumed so that it can start executing on schedule.
--set the task status to Resumed so that it can start executing on schedule.
ALTER TASK refresh_ordering_customers RESUME;
DESC TASK refresh_ordering_customers;
For monitoring the task execution, we need to query task_history.
--run the following to keep an eye on the task execution to validate that it runs successfully.
SELECT name, state,
completed_time, scheduled_time,
error_code, error_message
FROM TABLE(information_schema.task_history())
WHERE name = 'REFRESH_ORDERING_CUSTOMERS';
Conjugating pipelines through a task tree
We will use same scenario but this time we will create an initialization task to clean up the table before we insert new data into table.
--create an initialization task to clean up the table before we insert new data into the table.
USE DATABASE task_demo;
CREATE TASK clear_ordering_customers
WAREHOUSE = COMPUTE_WH
COMMENT = 'Delete from Ordering_Customers'
AS
DELETE FROM task_demo.public.ordering_customers;
We have created two tasks, one to delete the data and one to insert the data. Now, we will connect the two together to create a small pipeline. We will make the insert task after the clear task.
--We will now make the insert task to run after the clear task.
ALTER TASK insert_ordering_customers
ADD AFTER clear_ordering_customers;
--run a describe on the task to validate the tasks have been connected.
DESC TASK insert_ordering_customers;
--schedule our clear_ordering_customers task to execute on a schedule.
ALTER TASK clear_ordering_customers
SET SCHEDULE = '10 MINUTE';
--If you are running your code through a role other than ACCOUNTADMIN you must grant that role the privilege to execute task.
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN;
--set the tasks to Resume since tasks are created as suspended by default and would not work unless we set them to resume.
ALTER TASK insert_ordering_customers RESUME;
ALTER TASK clear_ordering_customers RESUME;
--keep an eye on the task execution to validate that it runs successfully.
SELECT name, state,
completed_time, scheduled_time,
error_code, error_message
FROM TABLE(information_schema.task_history())
WHERE name IN ('CLEAR_ORDERING_CUSTOMERS','RELOAD_ORDERING_CUSTOMERS');
Snowflake provides functionality to connect multiple tasks together in a parent-child relationship. This feature allows the building of pipelines that consist of multiple steps of execution.
For a given task. A predecessor can be set up by specifying the AFTER configuration for the task. The task specified in the AFTER configuration becomes the parent of the task. Through this, a hierarchy of tasks is maintained. The predecessor task must complete before the child tasks can execute.
Querying and viewing the task history
We will use task_history table function, we can be used to query the history of task execution.
SELECT * FROM TABLE(information_schema.task_history())
ORDER BY SCHEDULED_TIME;
--query the view to return task history between two timestamps.
SELECT * FROM
TABLE(information_schema.task_history(
scheduled_time_range_start=>to_timestamp_ltz('2020-08-13 14:00:00.000 -0700'),
scheduled_time_range_end=>to_timestamp_ltz('2020-08-13 14:10:00.000 -0700')
))
ORDER BY SCHEDULED_TIME;
--use the RESULT_LIMIT parameter.
SELECT * FROM
TABLE(information_schema.task_history(
result_limit => 5
))
ORDER BY SCHEDULED_TIME;
--query the TASK_HISTORY view based on the task name itself. This can be performed by using the TASK_NAME parameter.
SELECT * FROM
TABLE(information_schema.task_history(
task_name => 'CLEAR_ORDERING_CUSTOMERS'
))
ORDER BY SCHEDULED_TIME;
--combine parameters into a single query as well to narrow down our results
SELECT * FROM
TABLE(information_schema.task_history(
task_name => 'CLEAR_ORDERING_CUSTOMERS',
result_limit => 2
))
ORDER BY SCHEDULED_TIME;
Exploring the concept of streams to capture table-level changes
Configure a stream on table and capture the changes that occur at the table level. Streams are Snowflake’s way of performing change data capture on Snowflake tables and can be useful in data pipeline implementation.
--create a staging table to simulate data arriving from outside Snowflake and being processed further through a stream object.
CREATE DATABASE stream_demo;
USE DATABASE stream_demo;
CREATE TABLE customer_staging
(
ID INTEGER,
Name STRING,
State STRING,
Country STRING
);
-- create a stream
CREATE STREAM customer_changes ON TABLE customer_staging;
--describe the stream to see what has been created:
DESC STREAM customer_changes;
Notice the mode of stream is set to DEFAULT, which indicates it will track inserts, updates and deletes that are performed on table.
--insert some data into the staging table to simulate data arriving into Snowflake:
INSERT INTO customer_staging VALUES (1,'Jane Doe','NSW','AU');
INSERT INTO customer_staging VALUES (2,'Alpha','VIC','AU');
--validate that the data is indeed inserted into the staging table
SELECT * FROM customer_staging;
--view how the changing data has been captured through the stream.
SELECT * FROM customer_changes;
--we can now process the data from the stream into another table. Create a table first in which we will insert the recorded data.
CREATE TABLE customer
(
ID INTEGER,
Name STRING,
State STRING,
Country STRING
);
--Retrieve data from a stream and insert into table
INSERT INTO customer
SELECT ID,Name,State,Country
FROM customer_changes
WHERE metadata$action = 'INSERT';
--validate that correct data is inserted
SELECT * FROM customer;
--find out what happens to the stream after data has been processed from it.
SELECT * FROM customer_changes;
If we perform SELECT now, there will be zero rows returned since data has already been processed.
--update a row in the staging table.
UPDATE customer_staging SET name = 'John Smith' WHERE ID = 1;
--Select the data from the stream to see how an UPDATE appears in a stream
SELECT * FROM customer_changes;
An update operation essentially captured as DELETE followed by INSERT. Therefore, you will see both INSERT and UPDATE appear in result. If you are processing the stream for deletes as well, you will need additional logic in the consuming code to process DELETE correctly.
Combining the concept of streams and tasks to build pipelines that process changed data on schedule
This time we will create stream on table that captures only the inserts. The insert-only mode is achieved by setting APPEND_ONLY to TRUE.
--create a stream on the table that captures only the inserts.
CREATE STREAM customer_changes ON TABLE customer_staging APPEND_ONLY = TRUE;
Let’s now create a task that we will use to insert any new data that appears in the stream.
--create a task which we will use to insert any new data that appears in the stream.
CREATE TASK process_new_customers
WAREHOUSE = COMPUTE_WH
COMMENT = 'Process new data into customer'
AS
INSERT INTO customer
SELECT ID,Name,State,Country
FROM customer_changes
WHERE metadata$action = 'INSERT';
--Please note that to RESUME a task you will need to run the command as ACCOUTNADMIN or another role with the appropriate privilege.
ALTER TASK process_new_customers
SET SCHEDULE = '10 MINUTE';
ALTER TASK process_new_customers RESUME;
Converting data types and Snowflake’s failure management
Snowflake provides a novel as well as a very structured way of handling such failure scenarios and recovery methods. This allows the Snowflake user to build high-quality data processing pipelines that avoid failures and if they occur, know how to handle them, how to recover, and how to leave the system stable.
--convert a number stored as a string to a numeric value.
SELECT '100.2' AS input,
TO_NUMBER(input),
TO_NUMBER(input, 12, 2);
--TO_NUMBER function works great, until it encounters a non-numeric value.
SELECT 'not a number' AS input,
TO_NUMBER(input);
--use one of the TRY_ function on a non-numeric input, so it fails gracefully
SELECT 'not a number' AS input,
TRY_TO_NUMBER(input);
--perform the type conversion as per normal when a proper numeric input is provided.
SELECT '100.2' AS input,
TRY_TO_NUMBER(input);
--conversion of string values into Boolean data type.
--the following query string values True, true, tRue, T, yes, on and 1 are considered to be boolean value TRUE
SELECT TO_BOOLEAN('True'),
TO_BOOLEAN('true'),
TO_BOOLEAN('tRuE'),
TO_BOOLEAN('T'),
TO_BOOLEAN('yes'),
TO_BOOLEAN('on'),
TO_BOOLEAN('1');
--Conversely string values False, false, FalsE, f, no, off and 0 all convert into FALSE.
SELECT TO_BOOLEAN('False'),
TO_BOOLEAN('false'),
TO_BOOLEAN('FalsE'),
TO_BOOLEAN('f'),
TO_BOOLEAN('no'),
TO_BOOLEAN('off'),
TO_BOOLEAN('0');
--convert a string value that contains a date.
SELECT TO_DATE('2020-08-15'),
DATE('2020-08-15'),
TO_DATE('15/08/2020','DD/MM/YYYY');
--try and convert to a timestamp.
SELECT TO_TIMESTAMP_NTZ ('2020-08-15'),
TO_TIMESTAMP_NTZ ('2020-08-15 14:30:50');
Managing context using different utility functions
--Snowflake provides the function CURRENT_DATE which as the name suggests returns the current date in the default date format.
SELECT CURRENT_DATE();
--combine the output of CURRENT_DATE with other processing logic.
SELECT IFF ( DAYNAME( CURRENT_DATE() ) IN ( 'Sat', 'Sun'), TRUE, FALSE) as week_end_processing_flag;
--Snowflake provides the CURRENT_TIMESTAMP function, which in addition to the date also provides the time component.
SELECT CURRENT_TIMESTAMP();
--detect the client that a query is running from, using the CURRENT_CLIENT context function.
SELECT CURRENT_CLIENT();
--find out the region of your snowflake instance.
SELECT CURRENT_REGION();
-- use security specific contextual functions, for example the current role function.
SELECT CURRENT_ROLE();
--combine CURRENT_ROLE() in your view definitions to provide specific security processing, for example creating views that limit the number of rows based on which role is being used to query.
--Similar to the CURRENT_ROLE() is the CURRENT_USER() function which as the name describes returns the current user.
SELECT CURRENT_USER();
--Snowflake provides the current database function which returns the database selected for the session. If there is no database selected the function returns NULL.
USE DATABASE SNOWFLAKE_SAMPLE_DATA;
SELECT CURRENT_DATABASE();
--Snowflake provides the current schema function which returns the schema selected for the session. If there is no schema selected the function returns NULL.
USE DATABASE SNOWFLAKE_SAMPLE_DATA;
USE SCHEMA INFORMATION_SCHEMA;
SELECT CURRENT_SCHEMA();
--find out the current warehouse that has been selected to run the query by using the current warehouse function.
SELECT CURRENT_WAREHOUSE();
Data Protection and Security in Snowflake
Setting up custom roles and completing the role hierarchy
--create a database DEV.
USE ROLE SYSADMIN;
CREATE DATABASE DEV;
-- create a table for testing purposes
USE DATABASE DEV;
CREATE TABLE CUSTOMER
( ID STRING,
NAME STRING);
--change your role to SECURITY ADMIN so that you have the required privileges to create a new user & a new role.
USE ROLE SECURITYADMIN;
--create a new user which we will use to demonstrate the role privileges.
CREATE USER dev_dba_user1 PASSWORD='password123' MUST_CHANGE_PASSWORD = TRUE;
USE ROLE SECURITYADMIN;
CREATE ROLE DEV_DBA;
SHOW GRANTS TO ROLE DEV_DBA;
--provide the new role some privileges on the DEV database.
GRANT ALL ON DATABASE DEV TO ROLE DEV_DBA;
GRANT ALL ON ALL SCHEMAS IN DATABASE DEV TO ROLE DEV_DBA;
GRANT ALL ON TABLE DEV.PUBLIC.CUSTOMER TO ROLE DEV_DBA;
SHOW GRANTS TO ROLE DEV_DBA;
--grant the DEV_DBA role to dev_dba_user1.
USE ROLE SECURITYADMIN;
GRANT ROLE DEV_DBA TO USER dev_dba_user1;
The SELECT query will work since dev_dba_user1 has access to DEV_DBA role, and DEV_DBA role has rights on the DEV database.
When creating a new custom role, it is recommended to complete the role hierarchy. Completing the role hierarchy refers to ensuring that the roles are set up so that all custom roles are granted to the SYSTEM ADMIN role.
Configuring and assigning a default role to a user
To create a new user and grant a default role, we need to log in as a user who has access to the SECURITYADMIN role.
--Create a new user which we will call marketing_user1.
USE ROLE SECURITYADMIN;
CREATE USER marketing_user1 PASSWORD='password123' MUST_CHANGE_PASSWORD = TRUE;
--login as the marketing_user1 and run the following query to view what is the default role for the user.
SELECT CURRENT_ROLE();
--login as a user with access to the SECURITYADMIN role and assign a new role to the marketing_user1.
USE ROLE SECURITYADMIN;
CREATE ROLE MKT_USER;
GRANT ROLE MKT_USER TO USER marketing_user1;
--Let us now make the role the default role for the user.
ALTER USER marketing_user1 SET DEFAULT_ROLE = 'MKT_USER';
--re-login as the marketing_user1 and run the following query to view what is the default role for the user.
SELECT CURRENT_ROLE();
Configuring custom roles for managing access to highly secure data
-- create the database which will hold the sensitive data. In this database we will also create a table that contains salary information.
USE ROLE SYSADMIN;
CREATE DATABASE sensitive_data;
CREATE TABLE SALARY
(
EMP_ID INTEGER,
SALARY NUMBER
);
--create a role which will have access to this data.
USE ROLE SECURITYADMIN;
CREATE ROLE HR_ROLE;
--grant the necessary privileges to this role. We will transfer the ownership of the database & the table that we previously created to this role.
GRANT OWNERSHIP ON TABLE sensitive_data.PUBLIC.SALARY TO ROLE HR_ROLE;
GRANT OWNERSHIP ON SCHEMA sensitive_data.PUBLIC TO ROLE HR_ROLE;
GRANT OWNERSHIP ON DATABASE sensitive_data TO ROLE HR_ROLE;
--validate that no other role can access the data. As the SYSADMIN role try to access the data in this table.
USE ROLE SYSADMIN;
SELECT * FROM sensitive_data.PUBLIC.SALARY;
--check if ACCOUNTADMIN role can access the data.
USE ROLE ACCOUNTADMIN;
SELECT * FROM sensitive_data.PUBLIC.SALARY;
--only way to access this data now is to add specific user(s) to the HR_ROLE. We will now create a new user and add that user to the HR_ROLE.
USE ROLE SECURITYADMIN;
CREATE USER hr_user1 PASSWORD='password123';
GRANT ROLE HR_ROLE to USER hr_user1;
--Let us now login as the hr_user1 and see if we can access the salary data.
USE ROLE HR_ROLE;
SELECT * FROM sensitive_data.PUBLIC.SALARY;
Snowflake recommends that you complete the role hierarchy when creating a new custom role. Completing the role hierarchy means that you grant the newly created role to the SYSTEM ADMIN role. This completes the role hierarcy and automatically inherits the role privileges to SYSTEM ADMIN and the ACCOUNT ADMIN role.
However, if there is sensitive data that you would not like even the administrators to see, you can break the role hierarchy and not grant the new role to SYSTEM ADMIN.
Setting up development, testing, pre-production, and production database hierarchies and roles
-- create a new user which will act as database administrator for the development environment.
USE ROLE SECURITYADMIN;
CREATE USER dev_dba_1
PASSWORD = 'password123'
DEFAULT_ROLE = DEV_DBA_ROLE
MUST_CHANGE_PASSWORD = TRUE;
--create a role for the development DBA and grant it to the development user.
CREATE ROLE DEV_DBA_ROLE;
GRANT ROLE DEV_DBA_ROLE TO USER dev_dba_1;
--switch to the SYSADMIN role and create the development database.
USE ROLE SYSADMIN;
CREATE DATABASE DEV_DB;
--grant full access to the DEV_DBA_ROLE role.
GRANT ALL ON DATABASE DEV_DB TO ROLE DEV_DBA_ROLE;
--create the production database, roles & user using the same approach.
USE ROLE SECURITYADMIN;
CREATE USER prod_dba_1
PASSWORD = 'password123'
DEFAULT_ROLE = PROD_DBA_ROLE
MUST_CHANGE_PASSWORD = TRUE;
--create a role for the production DBA and grant it to the production user.
CREATE ROLE PROD_DBA_ROLE;
GRANT ROLE PROD_DBA_ROLE TO USER prod_dba_1;
--switch to the SYSADMIN role and create the production database.
USE ROLE SYSADMIN;
CREATE DATABASE PROD_DB;
--grant full access to the PROD_DBA_ROLE role.
GRANT ALL ON DATABASE PROD_DB TO ROLE PROD_DBA_ROLE;
Snowflake’s role-based access control security model allows security administrators to configure object security at a role level. Using role-based security, we have set up a development database and production database segregated from each other in terms of access.
Safeguarding the ACCOUNTADMIN role and users in the ACCOUNTADMIN role
--Create a new user that we will give the ACCOUTNADMIN role using the syntax below.
--Make sure that you provide a valid email address for the user.
--The email address will be used in next steps to set up multi factor authentication.
USE ROLE SECURITYADMIN;
CREATE USER second_account_admin
PASSWORD = 'password123'
EMAIL = 'john@doe.com'
MUST_CHANGE_PASSWORD = TRUE;
--grant the ACCOUNTADMIN role to the newly created user.
GRANT ROLE ACCOUNTADMIN TO USER second_account_admin;
--configure the default role for the newly created user.
--We will setup the default role of the new user to be SECURITYADMIN rather than ACCOUNTADMIN, to ensure that there is no inadvertent use of the ACCOUNTADMIN role.
ALTER USER second_account_admin
SET DEFAULT_ROLE = SECURITYADMIN;
When you sign up for a new Snowflake instance, the first user created automatically gets the ACCOUNTADMIN role. Since that is the only user with account administration privileges, the recommended practice is to create an additional user with account administration privileges. This is done to ensure that if one account administrator cannot log in for any reason, there is always an alternative.
Once a new account admin user is created, it is crucial to enable MFA for the user. MFA ensures that if the password is compromised, there is an additional security layer that stops any unauthorized access.
Performance and Cost Optimization
Examining table schemas ad deriving an optimal structure foe table
Assigning proper data types to your columns can help ensure that the table is structured and stored optimally. Storing numeric and date values in character data types takes up more storage and is less efficient during query processing. It is advisable to type your date and numeric columns correctly. The resultant storage savings can be quite significant for large tables.
Identifying query plans and bottlenecks
We will be running a sample query using the sample dataset that is provided by Snowflake. The intent is run an inefficient query, review its query plan, and identify which steps are using the most compute and contributing most to overall query execution.
USE SCHEMA SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL;
SELECT *
FROM store_returns,date_dim
WHERE sr_returned_date_sk = d_date_sk;
You will see that the query has started executing. If you click on Query ID, you will be shown the unique query ID for that execution. Clicking on the query ID value will take you to the query profile page.
The query profile page shows the execution plan of query. It also shows information about the most expensive steps in the execution plan, which you can review to identify bottlenecks and issues.
The TableScan step occurs when the data read from the table or, in other words, from storage. Read from storage is usually an expensive operation, and so any improvements that you can perform here will result in an overall query improvement. When we analyze TableScan step we will notice that the query scanned 113GB of data, and it scanned almost all the partitions of the table, which far from ideal. Also, you will notice the query profile shows 21 columns begin accessed. To optimize the query performance, we need to reduce the number of columns to just the columns that we actually need and, if possible, add a filter on the base table, which reduces the number of rows returned to only the ones need.
Weeding out inefficient queries through analysis
We will be querying the QUERY_HISTORY Materialized View (MV) under the SNOWFLAKE database and ACCOUNT_USAGE schema to identify queries that have taken a long time or scanned a lot of data. Based on that result set, we can identify which queries are potentially inefficient.
USE ROLE ACCOUNTADMIN;
USE SNOWFLAKE;
SELECT QUERY_ID, QUERY_TEXT, EXECUTION_TIME,USER_NAME
FROM SNOWFLAKE.ACCOUNT_USAGE.query_history
ORDER BY EXECUTION_TIME DESC;
You will need to focus on EXECUTION_TIME column and analyze queries that have taken too long to execute. Copy the query id and next, change your role in top-right and click on the History button.
The History button will take you to the query history page.
Click Query ID on the result set, which will result in the query profile being displayed.
We focused on queries that have a long execution time. When improving performance, it is also beneficial to improve queries that execute too frequently, even if they do not that that much time for single execution.
USE ROLE ACCOUNTADMIN;
USE SNOWFLAKE;
SELECT QUERY_TEXT, USER_NAME, HASH(QUERY_TEXT) AS PSEUDO_QUERY_ID ,
COUNT(*) AS NUM_OF_QUERIES, SUM(EXECUTION_TIME) AS AGG_EXECUTION_TIME
FROM SNOWFLAKE.ACCOUNT_USAGE.query_history
GROUP BY QUERY_TEXT, USER_NAME
ORDER BY AGG_EXECUTION_TIME DESC;
-- find the query id
USE ROLE ACCOUNTADMIN;
USE SNOWFLAKE;
SELECT QUERY_ID, QUERY_TEXT, USER_NAME, HASH(QUERY_TEXT) AS PSEUDO_QUERY_ID
FROM SNOWFLAKE.ACCOUNT_USAGE.query_history
WHERE PSEUDO_QUERY_ID = <PSEUDO query id from previous step>;
Identifying and reducing unnecessary Fail-safe and Time Travel storage usage
There are three types of tables in Snowflake: permanent, temporary, and transient. By default, a new table is created as a permanent table, which means that it has the Time Travel and Fail-Safe functionalities enabled. Both functionalities contribute to the storage. In the case of a temporary table used to store data temporarily, it is much better to create that table as a transient table so that Time Travel and fail-safe are disabled for such tables.
It is advisable to keep a check on the tables in your Snowflake system. Suppose you spot tables with large amounts of bytes in the Time Travel and Fail-Safe bytes columns but have a low number (or even 0) in the active bytes column.
Sample query;
USE ROLE ACCOUNTADMIN;
SELECT * FROM C6_R4.INFORMATION_SCHEMA.TABLE_STORAGE_METRICS
WHERE TABLE_CATALOG='C6_R4';
Projections in Snowflake for performance
Snowflake offers the concept of Materialized View (MV)s can be constructed from a table and how query latency can be reduced. Snowflake manages MVs and ensures a dynamic link to original table. As well as reflecting the changes consistently and in real time.
The link between a transactional table that needs to have an MV is maintained using a log. The log keeps track of the changes that happen to source table and. In real time. runs the process to update the MV. This is done as consistent transactions and at scale. The logic or aggregation function defined in the MV is applied based on any changes in data in the MV source table.
Sample MV;
--create a materialized view
CREATE OR REPLACE MATERIALIZED VIEW MV_SENSOR_READING(CREATE_TS, SENSOR_ID, SENSOR_READING)
CLUSTER BY (SENSOR_ID) AS
SELECT CREATE_TS, SENSOR_ID, SENSOR_READING
FROM SENSOR_DATA;
Clustering is a well-known technique for enabling data distribution into small, related storage blocks. We can also say that the data is cluster of values with respect to a particular column. Clustering generally involves sorting data; that is, each micro-partition will have sorted data. This helps get to a required micro-partition quickly, and sorting allows efficiently getting to an individual row within the micro-partition.
In our example, we had a one-to-one corresponding MV created over run our table, which was as good as creating a CREATE TABLE AS SELECT (CTAS) table from the original table. The benefit here is that a table created using the CTAS technique is not dynamically linked to the original table. As soon as a Data Manipulation Language (DML) operation is executed on the original table, the CTAS table or the derived table will be rendered stale and will not reflect the original table’s changes. MVs, on the other hand are updated systematically and automatically.
Another benefit is that Snowflake has smart routing or query tuning. The Snowflake query engine can intercept queries that are written against the main table, and if an MV is present for that table and the MV can answer the query, it automatically routes the query to the MV.
Another consideration in creating MV is to use the elastic virtual warehouse sizing capability of snowflake. This is done as MVs take time to create as data is redistributed and cached. To make the process faster, Snowflake’s capability to resize virtual warehouses elastically should be used. Sample query;
ALTER WAREHOUSE <NAME> SET WAREHOUSE_SIZE=XXXLARGE;
CREATE OR REPLACE MATERIALIZED VIEW MV_SENSOR_READING(CREATE_TS, SENSOR_ID, SENSOR_READING)
CLUSTER BY (SENSOR_ID) AS
SELECT CREATE_TS, SENSOR_ID, SENSOR_READING
FROM SENSOR_DATA;
ALTER WAREHOUSE <NAME> SET WAREHOUSE_SIZE=MEDIUM;
Reviewing query plans to modify table clustering
Snowflake provides the option to configure clustering keys for tables so that lager tables can benefit from partition pruning.
Sample scenario:
--Create a new database
CREATE DATABASE C6_R6;
-- create table that will hold the transaction data:
CREATE TABLE TRANSACTIONS
(
TXN_ID STRING,
TXN_DATE DATE,
CUSTOMER_ID STRING,
QUANTITY DECIMAL(20),
PRICE DECIMAL(30,2),
COUNTRY_CD STRING
);
--Populate this table with dummy data using the SQL given in the code block that follows.
--Run this step 8-10 times repeatedly to ensure that a large amount of data is inserted into the TRANSACTIONS table and many micro partitions are created.
INSERT INTO TRANSACTIONS
SELECT
UUID_STRING() AS TXN_ID
,DATEADD(DAY,UNIFORM(1, 500, RANDOM()) * -1, '2020-10-15') AS TXN_DATE
,UUID_STRING() AS CUSTOMER_ID
,UNIFORM(1, 10, RANDOM()) AS QUANTITY
,UNIFORM(1, 200, RANDOM()) AS PRICE
,RANDSTR(2,RANDOM()) AS COUNTRY_CD
FROM TABLE(GENERATOR(ROWCOUNT => 10000000));
We will run this query repeatedly enough to ensure that a large amount of data is inserted into the table. Now that enough data has been populated, we will run a sample query simulating a report that needs to access to last 30 days.
SELECT * FROM TRANSACTIONS
WHERE TXN_DATE BETWEEN DATEADD(DAY, -31, '2020-10-15') AND '2020-10-15';
You will observe that the number of total partitions and the number of partitions scanned is the same. This indicates that there is no partition pruning occurring, and there is a likelihood that a better clustering key will provide us with the desired partition pruning.
Since our query is filtering on TXN_DATE, we can change the clustering key to TXN_DATE. In production scenarios, you will need to analyze many queries to find the most common filter clause for large tables and use it the clustering key.
--change the clustering key to TXN_DATE.
ALTER TABLE TRANSACTIONS CLUSTER BY ( TXN_DATE );
Now that the re-clustering has been performed, we will rerun the same query and investigate whether the clustering key has improved performance.
Snowflake automatically partitions tables, where the partitions are added as new data added into a table. Over time, as data is added to the table, the partitions may not be contagious anymore, resulting in similar values for given column to be distributed over several micro-partitions.
Optimizing virtual warehouse scale
Snowflake has different ways to handle the challenges of big data. It offers horizontal and vertical scalability. Admittance of a large number of queries for concurrent execution requires horizontal scaling. If we can see that half of the queries went into a waiting state, then the cluster needs to double its size. It is linearly scalable.
The other aspect of scaling is the memory available for processing. When data being processed could not fit into memory, it spilled to disk. The amount of data spilled would provide insight into gap between the required resources and provisioned resources. This gap leads to I/O operations that are slow.
The spill to the local disk involves shedding data to storage provisioned in the virtual warehouse. This storage to which data is spilling is slower than memory. Secondly, the operation involved requires additional effort on the part of the machine’s compute resources, which would use the processing resource to transfer data form memory to disk.
The next level of disk spill involves storage that is not local to the processing unit. It is remote and is a shared resource available to the different compute nodes on the network. This is an even lower tier than disk storage available to the processing units. Secondly, now the network is involved as well, which further adds latency in data transfer.
It is important to provision a virtual warehouse that can handle the data in memory, and local disk spilled is limited. Remote storage should be avoided as much as possible as it is slow and adds to the billing of node, as network I/O slows down the processing, requiring more time, and time is money in Snowflake’s case.
Time Travel
Using Time Travel to return to state of data at a particular time
We can use the AT and BEFORE syntax to retrieve historical data at or before a specific timestamp through a SQL extension referred to as Time Travel, which provides the capability to access data that has been historically changed or deleted. Time Travel enables users to access and restore changes to data in tables, schemas, and databases. The functionality even allows restoring complete databases and schemas after they may have been dropped.
By default, when you make changes to a table’s data, historical data is retained for 1 day. However, for Enterprise Edition and above versions of Snowflake, you can set the retention to be up to 90 days, therefore allowing you to undo changes up to 90 days in the past.
--make note of the current time before running an update on the customer table. We will use this time stamp to see the data as it existed before our update.
SELECT CURRENT_TIMESTAMP;
--now select all rows from the table and use them in a variety of ways as per your requirements.
SELECT *
FROM CUSTOMER AT
(TIMESTAMP => '<time_stamp>'::timestamp_tz);
--if you are not 100% sure of the time when the update was made, you can use the BEFORE syntax and provide an approximate timestamp.
-- replace <time_stamp> with an approximate timestamp of your choosing
SELECT *
FROM CUSTOMER BEFORE
(TIMESTAMP => '<time_stamp>'::timestamp_tz);
Using Time Travel to recover from the accidental loss of table data
We can use the STATEMENT syntax to retrieve deleted data as it existed before the specified DELETE query was executed. Using this approach, we can access and retrieve data that was deleted by DELETE query or retrieve data that was changed through other queries such as an UPDATE query. The STATEMENT syntax allows you to provide a QUERY_ID value, and the data as it existed before that query was executed is retrieved.
Finding QUERY_ID:
--query the query history to identify which query deleted all the rows.
SELECT QUERY_ID, QUERY_TEXT, DATABASE_NAME, SCHEMA_NAME, QUERY_TYPE
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE QUERY_TYPE = 'DELETE'
AND EXECUTION_STATUS = 'SUCCESS'
AND DATABASE_NAME = 'C8_R2';
--use the timestamp and the BEFORE syntax, to travel back to how the table looked like before the delete was executed.
--replace the query_id with the appropriate query_id from the above statement
SELECT *
FROM CUSTOMER BEFORE
(STATEMENT => '<query_id>');
--undo the delete by inserting this data back into the table by using time travel.
INSERT INTO CUSTOMER
SELECT *
FROM CUSTOMER BEFORE
(STATEMENT => '<query_id>');
Identifying dropped databases, tables, and other objects and restoring them using Time Travel
When a table is dropped in Snowflake, it is marked as deleted in the metadata, but the table’s underlying data remains. When the UNDROP command is run on a dropped table, Snowflake chances the table’s deleted status back to undeleted, in effect restoring the table immediately.
--drop the customer table from the CUSTOMER schema.
USE SCHEMA SCHEMA1;
DROP TABLE CUSTOMER;
--programmatically find out the tables that may have been dropped
--Tables which have been dropped will have a non-NULL date value in the TABLE_DROPPED column.
USE ROLE ACCOUNTADMIN;
SELECT TABLE_CATALOG, TABLE_SCHEMA,TABLE_NAME,
ID,CLONE_GROUP_ID, TABLE_CREATED, TABLE_DROPPED
FROM INFORMATION_SCHEMA.TABLE_STORAGE_METRICS WHERE TABLE_CATALOG = 'C8_R3';
--Let us now restore the dropped table.
USE SCHEMA SCHEMA1;
UNDROP TABLE CUSTOMER;
--Validate that the table is indeed available now
SELECT COUNT(*) FROM CUSTOMER;
--drop the whole SCHEMA1 schema.
DROP SCHEMA SCHEMA1;
--Restore the schema.
UNDROP SCHEMA SCHEMA1;
Using Time Travel in conjunction with cloning to improve debugging
In Snowflake, within the cloud services layer, information for each table is stored in the metadata. However, the table’s actual data is stored in object storage. The metadata for each table maintains references to the actual data in the object storage. This architecture allows Snowflake to track changes to a table transparent to the user; therefore, when you have updated a table, the data that existed before the update is retained for some time, allowing Snowflake to perform Time Travel.
Since references to actual data are maintained in the metadata, Snowflake can configure new tables and set up the metadata references to point to existing data, a phenomenon known as cloning. A cloned table and its parent table are in effect pointing the same physical data. Snowflake also allows mixing cloning and Time Travel; you can create clones of objects as they existed at a point in time. This feature can be convenient when development teams are trying to debug production issues and may require the data be in state that it existed in on a particular day.
--clone the PRODUCTION_DB database into DEV_1 database, and while doing so also go back in time when the table only had the initial set of rows.
CREATE DATABASE DEV_1 CLONE PRODUCTION_DB AT(TIMESTAMP => '<replace with the timestamp from step#7>'::timestamp_tz);
Using cloning to set up new environments based on the production environment rapidly
In Snowflake, each database, schema, and table is maintained in the cloud services layer’s metadata. A table’s actual data is maintained in object storage. This approach allows Snowflake to make clones of existing objects without requiring the actual copying of data. Each cloned object points to the same data in the object storage. Snowflake takes care of the fact that if data is updated in the original tables, it is not reflected in the cloned tables, and vice versa – if a query the cloned data in the development database, it does not impact the production database.
--create a new database called PRD, which signifies that the database contains production data. We will also two schemas called SRC_DATA, INTEGRATED_DATA & REPORTING_DATA.
CREATE DATABASE PRD;
CREATE SCHEMA SRC_DATA;
CREATE SCHEMA INTEGRATED_DATA;
CREATE SCHEMA REPORTING_DATA;
--create a series of tables in these databases.
USE SCHEMA SRC_DATA;
CREATE TABLE CUSTOMER AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER;
USE SCHEMA SRC_DATA;
CREATE TABLE LINEITEM AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM;
USE SCHEMA INTEGRATED_DATA;
CREATE TABLE ORDERS AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS;
--create a reporting view to demonstrate that views also get cloned.
USE SCHEMA REPORTING_DATA;
CREATE VIEW REVENUE_REPORT AS
SELECT
L_RETURNFLAG,
L_LINESTATUS,
SUM(L_QUANTITY) AS SUM_QTY,
SUM(L_EXTENDEDPRICE) AS SUM_BASE_PRICE,
SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)) AS SUM_DISC_PRICE,
SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)) AS SUM_CHARGE,
AVG(L_QUANTITY) AS AVG_QTY,
AVG(L_EXTENDEDPRICE) AS AVG_PRICE,
AVG(L_DISCOUNT) AS AVG_DISC,
COUNT(*) AS COUNT_ORDER
FROM PRD.SRC_DATA.LINEITEM
WHERE L_SHIPDATE <= DATEADD(DAY, -90, TO_DATE('1998-12-01'))
GROUP BY L_RETURNFLAG,L_LINESTATUS;
--create a brand-new development environment for this PRD database, and we will create it with data.
CREATE DATABASE DEV_DB_1 CLONE PRD;
--validate that the new environment has all the required objects. To do so, expand the database tree in the left side of the Snowflake Web UI, you should see the following structure of database, schemas, tables & views.
--validate that there is actual data in the cloned tables.
SELECT COUNT(*) FROM DEV_DB_1.SRC_DATA.CUSTOMER;
--validate that there is actual data in the cloned views.
SELECT COUNT(*) FROM DEV_DB_1.REPORTING_DATA.REVENUE_REPORT;
--create a testing environment from the production environment.
CREATE DATABASE TEST_1 CLONE PRD;
--create a new development environment from the existing development environment. To do so run the following SQL:
CREATE DATABASE DEV_DB_2 CLONE DEV_DB_1;
