Skip to content

Commit ad0f3d2

Browse files
fix: Add admin interfaces for reservations (#159)
* fix: Add admin interfaces for reservations Also run blacken (reformatter) Also remove default python interpreter for any sessions where we don't care what interpreter is used, allowing `nox -s blacken` and others to work if you're running under 3.7, 3.9, etc. * 🦉 Updates from OwlBot * fix: add warning Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 77db700 commit ad0f3d2

26 files changed

+416
-262
lines changed

google/cloud/pubsublite/admin_client.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@
3232
TopicPath,
3333
BacklogLocation,
3434
)
35-
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic
35+
from google.cloud.pubsublite.types.paths import ReservationPath
36+
from google.cloud.pubsublite_v1 import (
37+
AdminServiceClient,
38+
Subscription,
39+
Topic,
40+
Reservation,
41+
)
3642

3743

3844
class AdminClient(AdminClientInterface, ConstructableFromServiceAccount):
@@ -98,7 +104,7 @@ def delete_topic(self, topic_path: TopicPath):
98104
return self._impl.delete_topic(topic_path)
99105

100106
@overrides
101-
def list_topic_subscriptions(self, topic_path: TopicPath):
107+
def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]:
102108
return self._impl.list_topic_subscriptions(topic_path)
103109

104110
@overrides
@@ -126,3 +132,31 @@ def update_subscription(
126132
@overrides
127133
def delete_subscription(self, subscription_path: SubscriptionPath):
128134
return self._impl.delete_subscription(subscription_path)
135+
136+
@overrides
137+
def create_reservation(self, reservation: Reservation) -> Reservation:
138+
return self._impl.create_reservation(reservation)
139+
140+
@overrides
141+
def get_reservation(self, reservation_path: ReservationPath) -> Reservation:
142+
return self._impl.get_reservation(reservation_path)
143+
144+
@overrides
145+
def list_reservations(self, location_path: LocationPath) -> List[Reservation]:
146+
return self._impl.list_reservations(location_path)
147+
148+
@overrides
149+
def update_reservation(
150+
self, reservation: Reservation, update_mask: FieldMask
151+
) -> Reservation:
152+
return self._impl.update_reservation(reservation, update_mask)
153+
154+
@overrides
155+
def delete_reservation(self, reservation_path: ReservationPath):
156+
return self._impl.delete_reservation(reservation_path)
157+
158+
@overrides
159+
def list_reservation_topics(
160+
self, reservation_path: ReservationPath
161+
) -> List[TopicPath]:
162+
return self._impl.list_reservation_topics(reservation_path)

google/cloud/pubsublite/admin_client_interface.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
SubscriptionPath,
2323
BacklogLocation,
2424
)
25-
from google.cloud.pubsublite_v1 import Topic, Subscription
25+
from google.cloud.pubsublite.types.paths import ReservationPath
26+
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
2627
from google.protobuf.field_mask_pb2 import FieldMask
2728

2829

@@ -60,7 +61,7 @@ def delete_topic(self, topic_path: TopicPath):
6061
"""Delete a topic and all associated messages."""
6162

6263
@abstractmethod
63-
def list_topic_subscriptions(self, topic_path: TopicPath):
64+
def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]:
6465
"""List the subscriptions that exist for a given topic."""
6566

6667
@abstractmethod
@@ -90,3 +91,49 @@ def update_subscription(
9091
@abstractmethod
9192
def delete_subscription(self, subscription_path: SubscriptionPath):
9293
"""Delete a subscription and all associated messages."""
94+
95+
@abstractmethod
96+
def create_reservation(self, reservation: Reservation) -> Reservation:
97+
"""Create a reservation, returns the created reservation.
98+
99+
warning:: This may not be implemented in the backend, it is a pre-release feature.
100+
"""
101+
102+
@abstractmethod
103+
def get_reservation(self, reservation_path: ReservationPath) -> Reservation:
104+
"""Get the reservation object from the server.
105+
106+
warning:: This may not be implemented in the backend, it is a pre-release feature.
107+
"""
108+
109+
@abstractmethod
110+
def list_reservations(self, location_path: LocationPath) -> List[Reservation]:
111+
"""List the Pub/Sub lite reservations that exist for a project in a given location.
112+
113+
warning:: This may not be implemented in the backend, it is a pre-release feature.
114+
"""
115+
116+
@abstractmethod
117+
def update_reservation(
118+
self, reservation: Reservation, update_mask: FieldMask
119+
) -> Reservation:
120+
"""Update the masked fields of the provided reservation.
121+
122+
warning:: This may not be implemented in the backend, it is a pre-release feature.
123+
"""
124+
125+
@abstractmethod
126+
def delete_reservation(self, reservation_path: ReservationPath):
127+
"""Delete a reservation and all associated messages.
128+
129+
warning:: This may not be implemented in the backend, it is a pre-release feature.
130+
"""
131+
132+
@abstractmethod
133+
def list_reservation_topics(
134+
self, reservation_path: ReservationPath
135+
) -> List[TopicPath]:
136+
"""List the subscriptions that exist for a given reservation.
137+
138+
warning:: This may not be implemented in the backend, it is a pre-release feature.
139+
"""

google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,39 @@
1818

1919
class AckSetTracker(AsyncContextManager):
2020
"""
21-
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
22-
is aggregated.
23-
"""
21+
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
22+
is aggregated.
23+
"""
2424

2525
@abstractmethod
2626
def track(self, offset: int):
2727
"""
28-
Track the provided offset.
28+
Track the provided offset.
2929
30-
Args:
31-
offset: the offset to track.
30+
Args:
31+
offset: the offset to track.
3232
33-
Raises:
34-
GoogleAPICallError: On an invalid offset to track.
35-
"""
33+
Raises:
34+
GoogleAPICallError: On an invalid offset to track.
35+
"""
3636

3737
@abstractmethod
3838
async def ack(self, offset: int):
3939
"""
40-
Acknowledge the message with the provided offset. The offset must have previously been tracked.
40+
Acknowledge the message with the provided offset. The offset must have previously been tracked.
4141
42-
Args:
43-
offset: the offset to acknowledge.
42+
Args:
43+
offset: the offset to acknowledge.
4444
45-
Returns:
46-
GoogleAPICallError: On a commit failure.
47-
"""
45+
Returns:
46+
GoogleAPICallError: On a commit failure.
47+
"""
4848

4949
@abstractmethod
5050
async def clear_and_commit(self):
5151
"""
52-
Discard all outstanding acks and wait for the commit offset to be acknowledged by the server.
52+
Discard all outstanding acks and wait for the commit offset to be acknowledged by the server.
5353
54-
Raises:
55-
GoogleAPICallError: If the committer has shut down due to a permanent error.
56-
"""
54+
Raises:
55+
GoogleAPICallError: If the committer has shut down due to a permanent error.
56+
"""

google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,22 @@ def make_async_publisher(
4949
metadata: Optional[Mapping[str, str]] = None,
5050
) -> AsyncSinglePublisher:
5151
"""
52-
Make a new publisher for the given topic.
53-
54-
Args:
55-
topic: The topic to publish to.
56-
transport: The transport type to use.
57-
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
58-
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
59-
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
60-
metadata: Additional metadata to send with the RPC.
61-
62-
Returns:
63-
A new AsyncPublisher.
64-
65-
Throws:
66-
GoogleApiCallException on any error determining topic structure.
67-
"""
52+
Make a new publisher for the given topic.
53+
54+
Args:
55+
topic: The topic to publish to.
56+
transport: The transport type to use.
57+
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
58+
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
59+
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
60+
metadata: Additional metadata to send with the RPC.
61+
62+
Returns:
63+
A new AsyncPublisher.
64+
65+
Throws:
66+
GoogleApiCallException on any error determining topic structure.
67+
"""
6868
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
6969

7070
def underlying_factory():
@@ -89,22 +89,22 @@ def make_publisher(
8989
metadata: Optional[Mapping[str, str]] = None,
9090
) -> SinglePublisher:
9191
"""
92-
Make a new publisher for the given topic.
93-
94-
Args:
95-
topic: The topic to publish to.
96-
transport: The transport type to use.
97-
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
98-
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
99-
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
100-
metadata: Additional metadata to send with the RPC.
101-
102-
Returns:
103-
A new Publisher.
104-
105-
Throws:
106-
GoogleApiCallException on any error determining topic structure.
107-
"""
92+
Make a new publisher for the given topic.
93+
94+
Args:
95+
topic: The topic to publish to.
96+
transport: The transport type to use.
97+
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
98+
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
99+
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
100+
metadata: Additional metadata to send with the RPC.
101+
102+
Returns:
103+
A new Publisher.
104+
105+
Throws:
106+
GoogleApiCallException on any error determining topic structure.
107+
"""
108108
return SinglePublisherImpl(
109109
make_async_publisher(
110110
topic=topic,

google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,23 @@ def make_async_subscriber(
175175
metadata: Optional[Mapping[str, str]] = None,
176176
) -> AsyncSingleSubscriber:
177177
"""
178-
Make a Pub/Sub Lite AsyncSubscriber.
179-
180-
Args:
181-
subscription: The subscription to subscribe to.
182-
transport: The transport type to use.
183-
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
184-
settings apply to each partition individually, not in aggregate.
185-
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
186-
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
187-
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
188-
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
189-
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
190-
metadata: Additional metadata to send with the RPC.
191-
192-
Returns:
193-
A new AsyncSubscriber.
194-
"""
178+
Make a Pub/Sub Lite AsyncSubscriber.
179+
180+
Args:
181+
subscription: The subscription to subscribe to.
182+
transport: The transport type to use.
183+
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
184+
settings apply to each partition individually, not in aggregate.
185+
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
186+
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
187+
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
188+
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
189+
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
190+
metadata: Additional metadata to send with the RPC.
191+
192+
Returns:
193+
A new AsyncSubscriber.
194+
"""
195195
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
196196
if client_options is None:
197197
client_options = ClientOptions(

google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,54 +19,54 @@
1919

2020
class AsyncSinglePublisher(AsyncContextManager):
2121
"""
22-
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
23-
async context. Any publish failures are permanent.
22+
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
23+
async context. Any publish failures are permanent.
2424
25-
Must be used in an `async with` block or have __aenter__() awaited before use.
26-
"""
25+
Must be used in an `async with` block or have __aenter__() awaited before use.
26+
"""
2727

2828
@abstractmethod
2929
async def publish(
3030
self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]
3131
) -> str:
3232
"""
33-
Publish a message.
33+
Publish a message.
3434
35-
Args:
36-
data: The bytestring payload of the message
37-
ordering_key: The key to enforce ordering on, or "" for no ordering.
38-
**attrs: Additional attributes to send.
35+
Args:
36+
data: The bytestring payload of the message
37+
ordering_key: The key to enforce ordering on, or "" for no ordering.
38+
**attrs: Additional attributes to send.
3939
40-
Returns:
41-
An ack id, which can be decoded using MessageMetadata.decode.
40+
Returns:
41+
An ack id, which can be decoded using MessageMetadata.decode.
4242
43-
Raises:
44-
GoogleApiCallError: On a permanent failure.
45-
"""
43+
Raises:
44+
GoogleApiCallError: On a permanent failure.
45+
"""
4646

4747

4848
class SinglePublisher(ContextManager):
4949
"""
50-
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
50+
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
5151
52-
Must be used in a `with` block or have __enter__() called before use.
53-
"""
52+
Must be used in a `with` block or have __enter__() called before use.
53+
"""
5454

5555
@abstractmethod
5656
def publish(
5757
self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]
5858
) -> "futures.Future[str]":
5959
"""
60-
Publish a message.
60+
Publish a message.
6161
62-
Args:
63-
data: The bytestring payload of the message
64-
ordering_key: The key to enforce ordering on, or "" for no ordering.
65-
**attrs: Additional attributes to send.
62+
Args:
63+
data: The bytestring payload of the message
64+
ordering_key: The key to enforce ordering on, or "" for no ordering.
65+
**attrs: Additional attributes to send.
6666
67-
Returns:
68-
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
67+
Returns:
68+
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
6969
70-
Raises:
71-
GoogleApiCallError: On a permanent failure.
72-
"""
70+
Raises:
71+
GoogleApiCallError: On a permanent failure.
72+
"""

0 commit comments

Comments
 (0)