1syntax = "proto3";
2
3package tensorflow.data;
4
5import "tensorflow/core/data/service/common.proto";
6import "tensorflow/core/framework/tensor.proto";
7
8message TaskProgress {
9  // The task that this message is about.
10  int64 task_id = 1;
11  // Whether the task has completed.
12  bool completed = 2;
13}
14
15message WorkerHeartbeatRequest {
16  string worker_address = 1;
17  string transfer_address = 3;
18  repeated int64 current_tasks = 2;
19}
20
21message WorkerHeartbeatResponse {
22  repeated TaskDef new_tasks = 1;
23  repeated int64 tasks_to_delete = 2;
24}
25
26message WorkerUpdateRequest {
27  string worker_address = 1;
28  repeated TaskProgress updates = 2;
29}
30
31message WorkerUpdateResponse {}
32
33message GetDatasetDefRequest {
34  int64 dataset_id = 1;
35}
36
37message GetDatasetDefResponse {
38  DatasetDef dataset_def = 1;
39}
40
41message GetSplitRequest {
42  int64 job_id = 1;
43  int64 repetition = 2;
44}
45
46message GetSplitResponse {
47  TensorProto split = 1;
48  bool end_of_splits = 2;
49}
50
51message GetVersionRequest {}
52
53message GetVersionResponse {
54  int64 version = 1;
55}
56
57message GetOrRegisterDatasetRequest {
58  // The dataset to register.
59  DatasetDef dataset = 1;
60}
61
62message GetOrRegisterDatasetResponse {
63  // The id for the registered dataset.
64  int64 dataset_id = 1;
65}
66
67message JobKey {
68  // A name for the job.
69  string job_name = 1;
70  // An index for the job. Multiple jobs can be created for the same name, if
71  // they have different indices.
72  int64 job_name_index = 2;
73}
74
75message GetOrCreateJobRequest {
76  reserved 3, 4;
77  // The id of the dataset to create a job for.
78  int64 dataset_id = 1;
79  // A mode controlling how the tf.data service produces data for the job.
80  ProcessingModeDef processing_mode = 2;
81  // Optional job key identifying a shared job. If not set, the RPC will always
82  // create a new job.
83  JobKey job_key = 5;
84  // Optional number of consumers. If set, the job's tasks will provide their
85  // elements to consumers round-robin.
86  oneof optional_num_consumers {
87    int64 num_consumers = 7;
88  }
89}
90
91message GetOrCreateJobResponse {
92  // An id for the client that will read from the job. When the client is done
93  // with the job, they should call ReleaseJobClient with this id.
94  int64 job_client_id = 1;
95}
96
97message ReleaseJobClientRequest {
98  int64 job_client_id = 1;
99}
100
101message ReleaseJobClientResponse {}
102
103message ClientHeartbeatRequest {
104  // The job client id to heartbeat for.
105  int64 job_client_id = 1;
106  // Reports which round the client is currently reading from when doing
107  // round-robin reads.
108  oneof optional_current_round {
109    int64 current_round = 2;
110  }
111  // Reports whether the client has successfully blocked the indicated round
112  // from starting. This enables the dispatcher to add a new task in the
113  // blocked round or later.
114  oneof optional_blocked_round {
115    int64 blocked_round = 4;
116  }
117}
118
119message ClientHeartbeatResponse {
120  // A list of all tasks that the client should read from.
121  repeated TaskInfo task_info = 1;
122  // Tells the client not to start the given round if possible.
123  oneof optional_block_round {
124    int64 block_round = 3;
125  }
126  // Whether the job has finished.
127  bool job_finished = 2;
128}
129
130message WorkerInfo {
131  string address = 1;
132  int64 id = 2;
133}
134
135message GetWorkersRequest {}
136
137message GetWorkersResponse {
138  // A list of all workers.
139  repeated WorkerInfo workers = 1;
140}
141
142service DispatcherService {
143  // Performs a periodic worker heartbeat.
144  rpc WorkerHeartbeat(WorkerHeartbeatRequest) returns (WorkerHeartbeatResponse);
145
146  // Updates the dispatcher with information about the worker's state.
147  rpc WorkerUpdate(WorkerUpdateRequest) returns (WorkerUpdateResponse);
148
149  // Gets a dataset defintion.
150  rpc GetDatasetDef(GetDatasetDefRequest) returns (GetDatasetDefResponse);
151
152  // Gets the next split for a given job.
153  rpc GetSplit(GetSplitRequest) returns (GetSplitResponse);
154
155  // Returns the API version of the server.
156  rpc GetVersion(GetVersionRequest) returns (GetVersionResponse);
157
158  // Registers a dataset with the server, or returns its id if it is already
159  // registered.
160  //
161  // The dataset is constructed in a new graph, so it must not refer to
162  // external resources or variables.
163  rpc GetOrRegisterDataset(GetOrRegisterDatasetRequest)
164      returns (GetOrRegisterDatasetResponse);
165
166  // Gets a job if it already exists, otherwise creates it.
167  rpc GetOrCreateJob(GetOrCreateJobRequest) returns (GetOrCreateJobResponse);
168
169  // Releases a job client so that a job may eventually be cleaned up.
170  rpc ReleaseJobClient(ReleaseJobClientRequest)
171      returns (ReleaseJobClientResponse);
172
173  // Heartbeats from the client. This lets the dispatcher know that the client
174  // is still active, and gives the dispatcher a chance to notify the client
175  // of new tasks.
176  rpc ClientHeartbeat(ClientHeartbeatRequest) returns (ClientHeartbeatResponse);
177
178  // Reports a list of all workers registered with the dispatcher.
179  rpc GetWorkers(GetWorkersRequest) returns (GetWorkersResponse);
180}
181