Merge "Replace pymongo deprecated api"
This commit is contained in:
commit
13414c1bc9
@ -24,6 +24,7 @@ Field Mappings:
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from bson import errors as bsonerror
|
||||
from bson import objectid
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
@ -661,9 +662,10 @@ class MessageController(storage.Message):
|
||||
for index, message in enumerate(messages)
|
||||
]
|
||||
|
||||
ids = collection.insert(prepared_messages, check_keys=False)
|
||||
res = collection.insert_many(prepared_messages,
|
||||
bypass_document_validation=True)
|
||||
|
||||
return [str(id_) for id_ in ids]
|
||||
return [str(id_) for id_ in res.inserted_ids]
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_autoreconnect
|
||||
@ -854,7 +856,8 @@ class FIFOMessageController(MessageController):
|
||||
# before the operation is abandoned is 49.95 seconds.
|
||||
for attempt in self._retry_range:
|
||||
try:
|
||||
ids = collection.insert(prepared_messages, check_keys=False)
|
||||
res = collection.insert_many(prepared_messages,
|
||||
bypass_document_validation=True)
|
||||
|
||||
# Log a message if we retried, for debugging perf issues
|
||||
if attempt != 0:
|
||||
@ -865,7 +868,7 @@ class FIFOMessageController(MessageController):
|
||||
LOG.debug(msgtmpl,
|
||||
dict(queue=queue_name,
|
||||
attempts=attempt + 1,
|
||||
num_messages=len(ids),
|
||||
num_messages=len(res.inserted_ids),
|
||||
project=project))
|
||||
|
||||
# Update the counter in preparation for the next batch
|
||||
@ -876,7 +879,8 @@ class FIFOMessageController(MessageController):
|
||||
# such that the competing marker's will start at a
|
||||
# unique number, 1 past the max of the messages just
|
||||
# inserted above.
|
||||
self._inc_counter(queue_name, project, amount=len(ids))
|
||||
self._inc_counter(queue_name, project,
|
||||
amount=len(res.inserted_ids))
|
||||
|
||||
# NOTE(kgriffs): Finalize the insert once we can say that
|
||||
# all the messages made it. This makes bulk inserts
|
||||
@ -887,9 +891,10 @@ class FIFOMessageController(MessageController):
|
||||
{'$set': {'tx': None}},
|
||||
upsert=False)
|
||||
|
||||
return [str(id_) for id_ in ids]
|
||||
return [str(id_) for id_ in res.inserted_ids]
|
||||
|
||||
except pymongo.errors.DuplicateKeyError as ex:
|
||||
except (pymongo.errors.DuplicateKeyError,
|
||||
pymongo.errors.BulkWriteError) as ex:
|
||||
# TODO(kgriffs): Record stats of how often retries happen,
|
||||
# and how many attempts, on average, are required to insert
|
||||
# messages.
|
||||
@ -971,7 +976,9 @@ class FIFOMessageController(MessageController):
|
||||
|
||||
for index, message in enumerate(prepared_messages):
|
||||
message['k'] = next_marker + index
|
||||
|
||||
except bsonerror.InvalidDocument as ex:
|
||||
LOG.exception(ex)
|
||||
raise
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
raise
|
||||
|
@ -108,14 +108,14 @@ class SubscriptionController(base.Subscription):
|
||||
confirmed = False
|
||||
|
||||
try:
|
||||
subscription_id = self._collection.insert({'s': source,
|
||||
'u': subscriber,
|
||||
't': ttl,
|
||||
'e': expires,
|
||||
'o': options,
|
||||
'p': project,
|
||||
'c': confirmed})
|
||||
return subscription_id
|
||||
res = self._collection.insert_one({'s': source,
|
||||
'u': subscriber,
|
||||
't': ttl,
|
||||
'e': expires,
|
||||
'o': options,
|
||||
'p': project,
|
||||
'c': confirmed})
|
||||
return res.inserted_id
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
return None
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user