Basic tutorial for BigQuery with Python


BigQuery is a fully-managed enterprise data warehouse for analystics.
It is cheap and high-scalable. In this article, I would like to share basic tutorial for BigQuery with Python.

🐹 Installation

pip install google-cloud-bigquery

😎 Create credentials

please see https://cloud.google.com/bigquery/docs/reference/libraries .

Additionally, please set the PATH to environment variables.

export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/[FILE_NAME].json"

🐝 Create a dataset if not existing

Create a dataset if there is not the dataset:

from google.cloud import bigquery

def bq_create_dataset():
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset('my_datasset_id')

try:
bigquery_client.get_dataset(dataset_ref)
except NotFound:
dataset = bigquery.Dataset(dataset_ref)
dataset = bigquery_client.create_dataset(dataset)
print('Dataset {} created.'.format(dataset.dataset_id))

🐯 Create a table if not existing

Create a table if there is not the table:

from google.cloud import bigquery

def bq_create_table():
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset('my_datasset_id')

# Prepares a reference to the table
table_ref = dataset_ref.table('my_table_name')

try:
bigquery_client.get_table(table_ref)
except NotFound:
schema = [
bigquery.SchemaField('name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
]
table = bigquery.Table(table_ref, schema=schema)
table = bigquery_client.create_table(table)
print('table {} created.'.format(table.table_id))

🚜 Schema info

You can add description or required option to schema information.

BQ_TABLE_SCHEMA = [
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED', description='Age'),
bigquery.SchemaField('name', 'STRING', description='Name'),
bigquery.SchemaField('created_at', 'TIMESTAMP', mode='REQUIRED', description='Date and time when the record was created')
]

If you want to know more detail for SchemaField method, please see bigquery.schema.SchemaField

🎳 Insert rows

Upload tuple object to BigQuery. It use stream buffer, so I don’t recommend it.

from google.cloud import bigquery

def export_items_to_bigquery():
# Instantiates a client
bigquery_client = bigquery.Client()

# Prepares a reference to the dataset
dataset_ref = bigquery_client.dataset('my_datasset_id')

table_ref = dataset_ref.table('my_table_id')
table = bigquery_client.get_table(table_ref) # API call

rows_to_insert = [
(u'Phred Phlyntstone', 32),
(u'Wylma Phlyntstone', 29),
]
errors = bigquery_client.insert_rows(table, rows_to_insert) # API request
assert errors == []

πŸ—» Check data exist

from google.cloud import bigquery

def exist_record()
bigquery_client = bigquery.Client()

query = ('SELECT id FROM `{}.{}.{}` WHERE id="{}" LIMIT 1'
.format('my_project_id', 'my_datasset_id', 'my_table_id', 'my_selected_id'))

try:
query_job = bigquery_client.query(query)
is_exist = len(list(query_job.result())) >= 1
print('Exist id: {}'.format('my_selected_id') if is_exist else 'Not exist id: {}'.format('my_selected_id'))
return is_exist
except Exception as e:
print("Error")
print(e)

return False

🏈 Upload a csv to google cloud storage and load the csv

This is a sample which is uploading a CSV file to google cloud storage and load the CSV file to BigQuery.

Before coding, please execute as follows:

pip install google-cloud-storage

After installing google-cloud-storage, add following functions:

from google.cloud import bigquery
from google.cloud import storage

def gcs_upload_blob():
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.get_bucket('my_bucket_name')
blob = bucket.blob('gs://path/to/file.name')

blob.upload_from_filename('/path/to/source.file')

print('File {} uploaded to {}.'.format(
'/path/to/source.csv',
'gs://path/to/upload.csv'))


def bq_load_csv_in_gcs():
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset()

job_config = bigquery.LoadJobConfig()
schema = [
bigquery.SchemaField('name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
]
job_config.schema = schema
job_config.skip_leading_rows = 1

load_job = bigquery_client.load_table_from_uri(
'gs://path/to/upload.csv',
dataset_ref.table('my_table_id'),
job_config=job_config)

assert load_job.job_type == 'load'

load_job.result() # Waits for table load to complete.

assert load_job.state == 'DONE'

πŸŽƒ Appendix

Web Console / Enable to standardSQL

If you want to delete some records in BigQuery, please add #standardSQL, like this:

#standardSQL
DELETE articles WHERE title = 'sample'

More Detail: Setting a query prefix

🐠 References

πŸ–₯ Recommended VPS Service

VULTR provides high performance cloud compute environment for you. Vultr has 15 data-centers strategically placed around the globe, you can use a VPS with 512 MB memory for just $ 2.5 / month ($ 0.004 / hour). In addition, Vultr is up to 4 times faster than the competition, so please check it => Check Benchmark Results!!