1#!/usr/bin/env python2.7
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import argparse
17import json
18import uuid
19import httplib2
20
21from apiclient import discovery
22from apiclient.errors import HttpError
23from oauth2client.client import GoogleCredentials
24
25# 30 days in milliseconds
26_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
27NUM_RETRIES = 3
28
29
30def create_big_query():
31    """Authenticates with cloud platform and gets a BiqQuery service object
32  """
33    creds = GoogleCredentials.get_application_default()
34    return discovery.build(
35        'bigquery', 'v2', credentials=creds, cache_discovery=False)
36
37
38def create_dataset(biq_query, project_id, dataset_id):
39    is_success = True
40    body = {
41        'datasetReference': {
42            'projectId': project_id,
43            'datasetId': dataset_id
44        }
45    }
46
47    try:
48        dataset_req = biq_query.datasets().insert(
49            projectId=project_id, body=body)
50        dataset_req.execute(num_retries=NUM_RETRIES)
51    except HttpError as http_error:
52        if http_error.resp.status == 409:
53            print 'Warning: The dataset %s already exists' % dataset_id
54        else:
55            # Note: For more debugging info, print "http_error.content"
56            print 'Error in creating dataset: %s. Err: %s' % (dataset_id,
57                                                              http_error)
58            is_success = False
59    return is_success
60
61
62def create_table(big_query, project_id, dataset_id, table_id, table_schema,
63                 description):
64    fields = [{
65        'name': field_name,
66        'type': field_type,
67        'description': field_description
68    } for (field_name, field_type, field_description) in table_schema]
69    return create_table2(big_query, project_id, dataset_id, table_id, fields,
70                         description)
71
72
73def create_partitioned_table(big_query,
74                             project_id,
75                             dataset_id,
76                             table_id,
77                             table_schema,
78                             description,
79                             partition_type='DAY',
80                             expiration_ms=_EXPIRATION_MS):
81    """Creates a partitioned table. By default, a date-paritioned table is created with
82  each partition lasting 30 days after it was last modified.
83  """
84    fields = [{
85        'name': field_name,
86        'type': field_type,
87        'description': field_description
88    } for (field_name, field_type, field_description) in table_schema]
89    return create_table2(big_query, project_id, dataset_id, table_id, fields,
90                         description, partition_type, expiration_ms)
91
92
93def create_table2(big_query,
94                  project_id,
95                  dataset_id,
96                  table_id,
97                  fields_schema,
98                  description,
99                  partition_type=None,
100                  expiration_ms=None):
101    is_success = True
102
103    body = {
104        'description': description,
105        'schema': {
106            'fields': fields_schema
107        },
108        'tableReference': {
109            'datasetId': dataset_id,
110            'projectId': project_id,
111            'tableId': table_id
112        }
113    }
114
115    if partition_type and expiration_ms:
116        body["timePartitioning"] = {
117            "type": partition_type,
118            "expirationMs": expiration_ms
119        }
120
121    try:
122        table_req = big_query.tables().insert(
123            projectId=project_id, datasetId=dataset_id, body=body)
124        res = table_req.execute(num_retries=NUM_RETRIES)
125        print 'Successfully created %s "%s"' % (res['kind'], res['id'])
126    except HttpError as http_error:
127        if http_error.resp.status == 409:
128            print 'Warning: Table %s already exists' % table_id
129        else:
130            print 'Error in creating table: %s. Err: %s' % (table_id,
131                                                            http_error)
132            is_success = False
133    return is_success
134
135
136def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
137    is_success = True
138
139    body = {
140        'schema': {
141            'fields': fields_schema
142        },
143        'tableReference': {
144            'datasetId': dataset_id,
145            'projectId': project_id,
146            'tableId': table_id
147        }
148    }
149
150    try:
151        table_req = big_query.tables().patch(
152            projectId=project_id,
153            datasetId=dataset_id,
154            tableId=table_id,
155            body=body)
156        res = table_req.execute(num_retries=NUM_RETRIES)
157        print 'Successfully patched %s "%s"' % (res['kind'], res['id'])
158    except HttpError as http_error:
159        print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
160        is_success = False
161    return is_success
162
163
164def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
165    is_success = True
166    body = {'rows': rows_list}
167    try:
168        insert_req = big_query.tabledata().insertAll(
169            projectId=project_id,
170            datasetId=dataset_id,
171            tableId=table_id,
172            body=body)
173        res = insert_req.execute(num_retries=NUM_RETRIES)
174        if res.get('insertErrors', None):
175            print 'Error inserting rows! Response: %s' % res
176            is_success = False
177    except HttpError as http_error:
178        print 'Error inserting rows to the table %s' % table_id
179        is_success = False
180
181    return is_success
182
183
184def sync_query_job(big_query, project_id, query, timeout=5000):
185    query_data = {'query': query, 'timeoutMs': timeout}
186    query_job = None
187    try:
188        query_job = big_query.jobs().query(
189            projectId=project_id,
190            body=query_data).execute(num_retries=NUM_RETRIES)
191    except HttpError as http_error:
192        print 'Query execute job failed with error: %s' % http_error
193        print http_error.content
194    return query_job
195
196
197    # List of (column name, column type, description) tuples
198def make_row(unique_row_id, row_values_dict):
199    """row_values_dict is a dictionary of column name and column value.
200  """
201    return {'insertId': unique_row_id, 'json': row_values_dict}
202