1
2# Copyright 2016 Google Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17from google import protobuf
18from gcloud import bigtable
19
20_COLUMN_FAMILY_ID = 'cf1'
21
22
23class BigTableClient(object):
24    """Defines the big table client that connects to the big table.
25
26    Attributes:
27        _column_family_id: A String for family of columns.
28        _client: An instance of Client which is project specific.
29        _client_instance: Representation of a Google Cloud Bigtable Instance.
30        _start_index: Start index for the row key. It gets incremented as we
31            dequeue.
32        _end_index : End index for row key. This is incremented as we Enqueue.
33        _table_name: A string that represents the big table.
34        _table_instance: An instance of the Table that represents the big table.
35    """
36
37    def __init__(self, table, project_id):
38        self._column_family_id = _COLUMN_FAMILY_ID
39        self._client = bigtable.Client(project=project_id, admin=True)
40        self._client_instance = None
41        self._start_index = 0
42        self._end_index = 0
43        self._table_name = table
44        self._table_instance = None
45        # Start client to enable receiving requests
46        self.StartClient()
47
48    def StartClient(self, instance_id):
49        """Starts client to prepare it to make requests."""
50
51        # Start the client
52        if not self._client.is_started():
53            self._client.start()
54        self._client_instance = self._client.instance(instance_id)
55        if self._table_instance is None:
56            self._table_instance = self._client_instance.table(self._table_name)
57
58    def StopClient(self):
59        """Stop client to close all the open gRPC clients."""
60
61        # stop client
62        self._client.stop()
63
64    def CreateTable(self):
65        """Creates a table in which read/write operations are performed.
66
67        Raises:
68            AbortionError: Error occurred when creating table is not successful.
69                This could be due to creating a table with a duplicate name.
70        """
71
72        # Create a table
73        logging.debug('Creating the table %s', self._table_name)
74
75        self._table_instance.create()
76        cf1 = self._table_instance.column_family(self._column_family_id)
77        cf1.create()
78
79    def Enqueue(self, messages, column_id):
80        """Writes new rows to the given table.
81
82        Args:
83            messages: An array of strings that represents the message to be
84                written to a new row in the table. Each message is writte to a
85                new row
86            column_id: A string that represents the name of the column to which
87                data is to be written.
88        """
89
90        # Start writing rows
91        logging.debug('Writing to the table : %s, column : %s', self._table_name,
92                      column_id)
93        for value in messages:
94            row_key = str(self._end_index)
95            self._end_index = self._end_index + 1
96            row = self._table_instance.row(row_key)
97            row.set_cell(self._column_family_id, column_id.encode('utf-8'),
98                         value.encode('utf-8'))
99            row.commit()
100        # End writing rows
101
102    def Dequeue(self):
103        """Removes and returns the first row from the table.
104
105        Returns:
106            row: A row object that represents the top most row.
107        """
108
109        if self._end_index < self._start_index:
110            return
111
112        logging.info('Getting a single row by row key.')
113        key = str(self._start_index)
114        row_cond = self._table_instance.row(key)
115        top_row = row_cond
116        row_cond.delete()
117        self._start_index = self._start_index + 1
118
119        return top_row
120
121    def DeleteTable(self):
122        """Performs delete operation for a given table."""
123
124        # Delete the table
125        logging.debug('Deleting the table : %s', self._table_name)
126        self._table_instance.delete()
127