Merge "Add more backoff functions"
This commit is contained in:
commit
0505f5ee51
@ -46,7 +46,8 @@ Webhook
|
|||||||
- 'minimum_delay' and 'maximum_delay' mean delay time in seconds.
|
- 'minimum_delay' and 'maximum_delay' mean delay time in seconds.
|
||||||
- 'retry_backoff_function' mean name of retry backoff function.
|
- 'retry_backoff_function' mean name of retry backoff function.
|
||||||
There will be a enum in Zaqar that contain all valid values.
|
There will be a enum in Zaqar that contain all valid values.
|
||||||
At first step, Zaqar only supports one function: 'linear'.
|
Zaqar now supports retry backoff function including 'linear',
|
||||||
|
'arithmetic','geometric' and 'exponential'.
|
||||||
- 'minimum_delay_retries' and 'maximum_delay_retries' mean the number of
|
- 'minimum_delay_retries' and 'maximum_delay_retries' mean the number of
|
||||||
retries with 'minimum_delay' or 'maximum_delay' delay time.
|
retries with 'minimum_delay' or 'maximum_delay' delay time.
|
||||||
|
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Support more retry backoff function in webhook type. It will work
|
||||||
|
when Zaqar failed to send the notification to the subscriber.
|
||||||
|
Users can define the retry backoff function in metadata of queue.
|
||||||
|
There are four retry backoff functions including 'linear',
|
||||||
|
'arithmetic', 'geometric' and 'exponential'.
|
@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import math
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@ -27,7 +28,33 @@ LOG = logging.getLogger(__name__)
|
|||||||
def _Linear_function(minimum_delay, maximum_delay, times):
|
def _Linear_function(minimum_delay, maximum_delay, times):
|
||||||
return range(minimum_delay, maximum_delay, times)
|
return range(minimum_delay, maximum_delay, times)
|
||||||
|
|
||||||
RETRY_BACKOFF_FUNCTION_MAP = {'linear': _Linear_function}
|
|
||||||
|
def _Geometric_function(minimum_delay, maximum_delay, times):
|
||||||
|
x_max = int((maximum_delay - minimum_delay) / times)
|
||||||
|
k = math.pow(10, math.log10(maximum_delay/minimum_delay)/(x_max-1))
|
||||||
|
xarray = range(1, x_max+1)
|
||||||
|
return [int(minimum_delay*math.pow(k, a-1)) for a in xarray]
|
||||||
|
|
||||||
|
|
||||||
|
def _Exponential_function(minimum_delay, maximum_delay, times):
|
||||||
|
x_max = int((maximum_delay - minimum_delay) / times)
|
||||||
|
k = math.pow(10, math.log10(maximum_delay/minimum_delay)/(x_max-1))
|
||||||
|
p = minimum_delay/k
|
||||||
|
xarray = range(1, x_max+1)
|
||||||
|
return [int(p*math.pow(k, a)) for a in xarray]
|
||||||
|
|
||||||
|
|
||||||
|
def _Arithmetic_function(minimum_delay, maximum_delay, times):
|
||||||
|
x_max = int((maximum_delay - minimum_delay) / times)
|
||||||
|
d = 2.0 * (maximum_delay - minimum_delay) / (x_max * (x_max - 1))
|
||||||
|
xarray = range(1, x_max+1)
|
||||||
|
return [int(minimum_delay+(a-1)*a*d/2) for a in xarray]
|
||||||
|
|
||||||
|
|
||||||
|
RETRY_BACKOFF_FUNCTION_MAP = {'linear': _Linear_function,
|
||||||
|
'arithmetic': _Arithmetic_function,
|
||||||
|
'geometric': _Geometric_function,
|
||||||
|
'exponential': _Exponential_function}
|
||||||
|
|
||||||
|
|
||||||
class WebhookTask(object):
|
class WebhookTask(object):
|
||||||
@ -65,9 +92,8 @@ class WebhookTask(object):
|
|||||||
time.sleep(retry_policy.get('minimum_delay', consts.MINIMUM_DELAY))
|
time.sleep(retry_policy.get('minimum_delay', consts.MINIMUM_DELAY))
|
||||||
if self._post_request_success(subscriber, data, headers):
|
if self._post_request_success(subscriber, data, headers):
|
||||||
return
|
return
|
||||||
# Backoff Phase: Linear retry
|
# Now we support linear,arithmetic,
|
||||||
# TODO(wanghao): Now we only support the linear function, we should
|
# exponential and geometric retry backoff function.
|
||||||
# support more in Queens.
|
|
||||||
retry_function = retry_policy.get('retry_backoff_function', 'linear')
|
retry_function = retry_policy.get('retry_backoff_function', 'linear')
|
||||||
backoff_function = RETRY_BACKOFF_FUNCTION_MAP[retry_function]
|
backoff_function = RETRY_BACKOFF_FUNCTION_MAP[retry_function]
|
||||||
for i in backoff_function(retry_policy.get('minimum_delay',
|
for i in backoff_function(retry_policy.get('minimum_delay',
|
||||||
@ -75,8 +101,8 @@ class WebhookTask(object):
|
|||||||
retry_policy.get('maximum_delay',
|
retry_policy.get('maximum_delay',
|
||||||
consts.MAXIMUM_DELAY),
|
consts.MAXIMUM_DELAY),
|
||||||
consts.LINEAR_INTERVAL):
|
consts.LINEAR_INTERVAL):
|
||||||
LOG.debug('Retry with retry_backoff_function, sleep: %s seconds',
|
LOG.debug('Retry with function:%s, sleep: %s seconds',
|
||||||
i)
|
retry_function, i)
|
||||||
time.sleep(i)
|
time.sleep(i)
|
||||||
if self._post_request_success(subscriber, data, headers):
|
if self._post_request_success(subscriber, data, headers):
|
||||||
return
|
return
|
||||||
|
@ -21,6 +21,7 @@ import mock
|
|||||||
|
|
||||||
from zaqar.common import urls
|
from zaqar.common import urls
|
||||||
from zaqar.notification import notifier
|
from zaqar.notification import notifier
|
||||||
|
from zaqar.notification.tasks import webhook
|
||||||
from zaqar import tests as testing
|
from zaqar import tests as testing
|
||||||
|
|
||||||
|
|
||||||
@ -433,3 +434,16 @@ class NotifierTest(testing.TestBase):
|
|||||||
@ddt.data(False, True)
|
@ddt.data(False, True)
|
||||||
def test_send_confirm_notification_with_email(self, is_unsub):
|
def test_send_confirm_notification_with_email(self, is_unsub):
|
||||||
self._send_confirm_notification_with_email(is_unsubscribed=is_unsub)
|
self._send_confirm_notification_with_email(is_unsubscribed=is_unsub)
|
||||||
|
|
||||||
|
def test_webhook_backoff_function(self):
|
||||||
|
expect = [10, 12, 14, 18, 22, 27, 33, 40, 49, 60]
|
||||||
|
sec = webhook._Exponential_function(10, 60, 5)
|
||||||
|
self.assertEqual(expect, sec)
|
||||||
|
|
||||||
|
expect = [20, 22, 25, 29, 33, 37, 42, 48, 54, 62, 70, 80]
|
||||||
|
sec = webhook._Geometric_function(20, 80, 5)
|
||||||
|
self.assertEqual(expect, sec)
|
||||||
|
|
||||||
|
expect = [30, 30, 32, 34, 37, 41, 46, 51, 57, 64, 72, 80, 90, 100]
|
||||||
|
sec = webhook._Arithmetic_function(30, 100, 5)
|
||||||
|
self.assertEqual(expect, sec)
|
||||||
|
@ -21,8 +21,10 @@ from oslo_config import cfg
|
|||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from zaqar.common import consts
|
||||||
from zaqar.i18n import _
|
from zaqar.i18n import _
|
||||||
|
|
||||||
|
|
||||||
MIN_MESSAGE_TTL = 60
|
MIN_MESSAGE_TTL = 60
|
||||||
MIN_CLAIM_TTL = 60
|
MIN_CLAIM_TTL = 60
|
||||||
MIN_CLAIM_GRACE = 60
|
MIN_CLAIM_GRACE = 60
|
||||||
@ -249,11 +251,11 @@ class Validator(object):
|
|||||||
if retry_value and not isinstance(retry_value, str):
|
if retry_value and not isinstance(retry_value, str):
|
||||||
msg = _('retry_backoff_function must be a string.')
|
msg = _('retry_backoff_function must be a string.')
|
||||||
raise ValidationFailed(msg)
|
raise ValidationFailed(msg)
|
||||||
# TODO(wanghao): Now we only support linear function.
|
# Now we support linear, arithmetic, exponential
|
||||||
# This will be removed after we support more functions.
|
# and geometric retry backoff function.
|
||||||
if retry_value and retry_value != 'linear':
|
fun = {'linear', 'arithmetic', 'exponential', 'geometric'}
|
||||||
msg = _('retry_backoff_function only supports linear '
|
if retry_value and retry_value not in fun:
|
||||||
'now.')
|
msg = _('invalid retry_backoff_function.')
|
||||||
raise ValidationFailed(msg)
|
raise ValidationFailed(msg)
|
||||||
elif key == 'ignore_subscription_override':
|
elif key == 'ignore_subscription_override':
|
||||||
if retry_value and not isinstance(retry_value, bool):
|
if retry_value and not isinstance(retry_value, bool):
|
||||||
@ -264,6 +266,16 @@ class Validator(object):
|
|||||||
if retry_value and not isinstance(retry_value, int):
|
if retry_value and not isinstance(retry_value, int):
|
||||||
msg = _('Retry policy: %s must be a integer.') % key
|
msg = _('Retry policy: %s must be a integer.') % key
|
||||||
raise ValidationFailed(msg)
|
raise ValidationFailed(msg)
|
||||||
|
min_delay = retry_policy.get('minimum_delay',
|
||||||
|
consts.MINIMUM_DELAY)
|
||||||
|
max_delay = retry_policy.get('maximum_delay',
|
||||||
|
consts.MAXIMUM_DELAY)
|
||||||
|
if max_delay < min_delay:
|
||||||
|
msg = _('minimum_delay must less than maximum_delay.')
|
||||||
|
raise ValidationFailed(msg)
|
||||||
|
if ((max_delay - min_delay) < 2*consts.LINEAR_INTERVAL):
|
||||||
|
msg = _('invalid minimum_delay and maximum_delay.')
|
||||||
|
raise ValidationFailed(msg)
|
||||||
|
|
||||||
def queue_patching(self, request, changes):
|
def queue_patching(self, request, changes):
|
||||||
washed_changes = []
|
washed_changes = []
|
||||||
|
Loading…
x
Reference in New Issue
Block a user