1# Copyright 2016 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16from google import protobuf 17from gcloud import bigtable 18 19_COLUMN_FAMILY_ID = 'cf1' 20 21 22class BigTableClient(object): 23 """Defines the big table client that connects to the big table. 24 25 Attributes: 26 _column_family_id: A String for family of columns. 27 _client: An instance of Client which is project specific. 28 _client_instance: Representation of a Google Cloud Bigtable Instance. 29 _start_index: Start index for the row key. It gets incremented as we 30 dequeue. 31 _end_index : End index for row key. This is incremented as we Enqueue. 32 _table_name: A string that represents the big table. 33 _table_instance: An instance of the Table that represents the big table. 34 """ 35 36 def __init__(self, table, project_id): 37 self._column_family_id = _COLUMN_FAMILY_ID 38 self._client = bigtable.Client(project=project_id, admin=True) 39 self._client_instance = None 40 self._start_index = 0 41 self._end_index = 0 42 self._table_name = table 43 self._table_instance = None 44 # Start client to enable receiving requests 45 self.StartClient() 46 47 def StartClient(self, instance_id): 48 """Starts client to prepare it to make requests.""" 49 50 # Start the client 51 if not self._client.is_started(): 52 self._client.start() 53 self._client_instance = self._client.instance(instance_id) 54 if self._table_instance is None: 55 self._table_instance = self._client_instance.table( 56 self._table_name) 57 58 def StopClient(self): 59 """Stop client to close all the open gRPC clients.""" 60 61 # stop client 62 self._client.stop() 63 64 def CreateTable(self): 65 """Creates a table in which read/write operations are performed. 66 67 Raises: 68 AbortionError: Error occurred when creating table is not successful. 69 This could be due to creating a table with a duplicate name. 70 """ 71 72 # Create a table 73 logging.debug('Creating the table %s', self._table_name) 74 75 self._table_instance.create() 76 cf1 = self._table_instance.column_family(self._column_family_id) 77 cf1.create() 78 79 def Enqueue(self, messages, column_id): 80 """Writes new rows to the given table. 81 82 Args: 83 messages: An array of strings that represents the message to be 84 written to a new row in the table. Each message is writte to a 85 new row 86 column_id: A string that represents the name of the column to which 87 data is to be written. 88 """ 89 90 # Start writing rows 91 logging.debug('Writing to the table : %s, column : %s', 92 self._table_name, column_id) 93 for value in messages: 94 row_key = str(self._end_index) 95 self._end_index = self._end_index + 1 96 row = self._table_instance.row(row_key) 97 row.set_cell(self._column_family_id, 98 column_id.encode('utf-8'), value.encode('utf-8')) 99 row.commit() 100 # End writing rows 101 102 def Dequeue(self): 103 """Removes and returns the first row from the table. 104 105 Returns: 106 row: A row object that represents the top most row. 107 """ 108 109 if self._end_index < self._start_index: 110 return 111 112 logging.debug('Getting a single row by row key.') 113 key = str(self._start_index) 114 row_cond = self._table_instance.row(key) 115 top_row = row_cond 116 row_cond.delete() 117 self._start_index = self._start_index + 1 118 119 return top_row 120 121 def DeleteTable(self): 122 """Performs delete operation for a given table.""" 123 124 # Delete the table 125 logging.debug('Deleting the table : %s', self._table_name) 126 self._table_instance.delete() 127