1 2# Copyright 2016 Google Inc. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import logging 17from google import protobuf 18from gcloud import bigtable 19 20_COLUMN_FAMILY_ID = 'cf1' 21 22 23class BigTableClient(object): 24 """Defines the big table client that connects to the big table. 25 26 Attributes: 27 _column_family_id: A String for family of columns. 28 _client: An instance of Client which is project specific. 29 _client_instance: Representation of a Google Cloud Bigtable Instance. 30 _start_index: Start index for the row key. It gets incremented as we 31 dequeue. 32 _end_index : End index for row key. This is incremented as we Enqueue. 33 _table_name: A string that represents the big table. 34 _table_instance: An instance of the Table that represents the big table. 35 """ 36 37 def __init__(self, table, project_id): 38 self._column_family_id = _COLUMN_FAMILY_ID 39 self._client = bigtable.Client(project=project_id, admin=True) 40 self._client_instance = None 41 self._start_index = 0 42 self._end_index = 0 43 self._table_name = table 44 self._table_instance = None 45 # Start client to enable receiving requests 46 self.StartClient() 47 48 def StartClient(self, instance_id): 49 """Starts client to prepare it to make requests.""" 50 51 # Start the client 52 if not self._client.is_started(): 53 self._client.start() 54 self._client_instance = self._client.instance(instance_id) 55 if self._table_instance is None: 56 self._table_instance = self._client_instance.table(self._table_name) 57 58 def StopClient(self): 59 """Stop client to close all the open gRPC clients.""" 60 61 # stop client 62 self._client.stop() 63 64 def CreateTable(self): 65 """Creates a table in which read/write operations are performed. 66 67 Raises: 68 AbortionError: Error occurred when creating table is not successful. 69 This could be due to creating a table with a duplicate name. 70 """ 71 72 # Create a table 73 logging.debug('Creating the table %s', self._table_name) 74 75 self._table_instance.create() 76 cf1 = self._table_instance.column_family(self._column_family_id) 77 cf1.create() 78 79 def Enqueue(self, messages, column_id): 80 """Writes new rows to the given table. 81 82 Args: 83 messages: An array of strings that represents the message to be 84 written to a new row in the table. Each message is writte to a 85 new row 86 column_id: A string that represents the name of the column to which 87 data is to be written. 88 """ 89 90 # Start writing rows 91 logging.debug('Writing to the table : %s, column : %s', self._table_name, 92 column_id) 93 for value in messages: 94 row_key = str(self._end_index) 95 self._end_index = self._end_index + 1 96 row = self._table_instance.row(row_key) 97 row.set_cell(self._column_family_id, column_id.encode('utf-8'), 98 value.encode('utf-8')) 99 row.commit() 100 # End writing rows 101 102 def Dequeue(self): 103 """Removes and returns the first row from the table. 104 105 Returns: 106 row: A row object that represents the top most row. 107 """ 108 109 if self._end_index < self._start_index: 110 return 111 112 logging.info('Getting a single row by row key.') 113 key = str(self._start_index) 114 row_cond = self._table_instance.row(key) 115 top_row = row_cond 116 row_cond.delete() 117 self._start_index = self._start_index + 1 118 119 return top_row 120 121 def DeleteTable(self): 122 """Performs delete operation for a given table.""" 123 124 # Delete the table 125 logging.debug('Deleting the table : %s', self._table_name) 126 self._table_instance.delete() 127