Skip to content

Commit cda342c

Browse files
oleksandrivantsivSviatoslav Boichuk
authored and
Sviatoslav Boichuk
committed
Fix the issue of ignoring callback calls for removed keys. (sonic-net#811)
**What I did** Fix the issue of ignoring callback calls for removed keys. **Why I did it** ConfigDBConnector.listen method has a caching mechanism (added in sonic-net#587 PR) that preloads the DB state before starting. When the notification about the changed key is received the listen method gets key data from the DB (in all cases when the key was added, updated, or removed) and compares the data with the cache. It fires the callback only if data differ from the cache. Otherwise, the callback is ignored. If the `client.hgetall(key)` is called for the removed key it returns an empty dictionary (`{}`). This can be confused with the data of the key with no attributes. For example: `{"TABLE": {"KEY": {}}}`. So if preloaded data contains a key with no attributes and the listener method receives a notification about the removal of such key the callback is not called. The listener will simply remove the key from the cache without calling the callback. This leads to the situation when the removal of the key is not handled. The solution is to get data for the added or updated keys, and for the removed keys use `None` instead. This will ensure that the comparison with the cache will work as expected. **How I verified it** Compile the package and run the unit test. Unit tests are extended to cover the expected behavior.
1 parent 233dbef commit cda342c

File tree

2 files changed

+76
-21
lines changed

2 files changed

+76
-21
lines changed

common/configdb.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
105105
try:
106106
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
107107
if table in self.handlers:
108-
client = self.get_redis_client(self.db_name)
109-
data = self.raw_to_typed(client.hgetall(key))
108+
if item['data'] == 'del':
109+
data = None
110+
else:
111+
client = self.get_redis_client(self.db_name)
112+
data = self.raw_to_typed(client.hgetall(key))
110113
if table in init_data and row in init_data[table]:
111114
cache_hit = init_data[table][row] == data
112115
del init_data[table][row]

tests/test_redis_ut.py

+71-19
Original file line numberDiff line numberDiff line change
@@ -634,46 +634,98 @@ def thread_coming_entry():
634634
def test_ConfigDBInit():
635635
table_name_1 = 'TEST_TABLE_1'
636636
table_name_2 = 'TEST_TABLE_2'
637+
table_name_3 = 'TEST_TABLE_3'
637638
test_key = 'key1'
638-
test_data = {'field1': 'value1'}
639-
test_data_update = {'field1': 'value2'}
639+
test_data = {'field1': 'value1', 'field2': 'value2'}
640+
641+
queue = multiprocessing.Queue()
640642

641643
manager = multiprocessing.Manager()
642644
ret_data = manager.dict()
643645

644-
def test_handler(table, key, data, ret):
645-
ret[table] = {key: data}
646-
647-
def test_init_handler(data, ret):
646+
def test_handler(table, key, data, ret, q=None):
647+
if table not in ret:
648+
ret[table] = {}
649+
if data is None:
650+
ret[table] = {k: v for k, v in ret[table].items() if k != key}
651+
if q:
652+
q.put(ret[table])
653+
elif key not in ret[table] or ret[table][key] != data:
654+
ret[table] = {key: data}
655+
if q:
656+
q.put(ret[table])
657+
658+
def test_init_handler(data, ret, queue):
648659
ret.update(data)
660+
queue.put(ret)
649661

650-
def thread_listen(ret):
662+
def thread_listen(ret, queue):
651663
config_db = ConfigDBConnector()
652664
config_db.connect(wait_for_init=False)
653665

654-
config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
666+
config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret, queue),
655667
fire_init_data=False)
656-
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
668+
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret, queue),
657669
fire_init_data=True)
670+
config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue),
671+
fire_init_data=False)
658672

659-
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))
673+
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue))
660674

661675
config_db = ConfigDBConnector()
662676
config_db.connect(wait_for_init=False)
663677
client = config_db.get_redis_client(config_db.CONFIG_DB)
664678
client.flushdb()
665679

666-
# Init table data
667-
config_db.set_entry(table_name_1, test_key, test_data)
668-
config_db.set_entry(table_name_2, test_key, test_data)
680+
# Prepare unique data per each table to track if correct data are received in the update
681+
table_1_data = {f'{table_name_1}_{k}': v for k, v in test_data.items()}
682+
config_db.set_entry(table_name_1, test_key, table_1_data)
683+
table_2_data = {f'{table_name_2}_{k}': v for k, v in test_data.items()}
684+
config_db.set_entry(table_name_2, test_key, table_2_data)
685+
config_db.set_entry(table_name_3, test_key, {})
669686

670-
thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
671-
thread.start()
672-
time.sleep(5)
673-
thread.terminate()
687+
# Run the listener in a separate process. It is not possible to stop a listener when it is running as a thread.
688+
# When it runs in a separate process we can terminate it with a signal.
689+
listener = multiprocessing.Process(target=thread_listen, args=(ret_data, queue))
690+
listener.start()
674691

675-
assert ret_data[table_name_1] == {test_key: test_data}
676-
assert ret_data[table_name_2] == {test_key: test_data}
692+
try:
693+
# During the subscription to table 2 'fire_init_data=True' is passed. The callback should be called before the listener.
694+
# Verify that callback is fired before listener initialization.
695+
data = queue.get(timeout=5)
696+
assert data == {test_key: table_2_data}
697+
698+
# Wait for init data
699+
init_data = queue.get(timeout=5)
700+
701+
# Verify that all tables initialized correctly
702+
assert init_data[table_name_1] == {test_key: table_1_data}
703+
assert init_data[table_name_2] == {test_key: table_2_data}
704+
assert init_data[table_name_3] == {test_key: {}}
705+
706+
# Remove one key-value pair from the data. Verify that the entry is updated correctly
707+
table_1_data.popitem()
708+
config_db.set_entry(table_name_1, test_key, table_1_data)
709+
data = queue.get(timeout=5)
710+
assert data == {test_key: table_1_data}
711+
712+
# Remove all key-value pairs. Verify that the table still contains key
713+
config_db.set_entry(table_name_1, test_key, {})
714+
data = queue.get(timeout=5)
715+
assert data == {test_key: {}}
716+
717+
# Remove the key
718+
config_db.set_entry(table_name_1, test_key, None)
719+
data = queue.get(timeout=5)
720+
assert test_key not in data
721+
722+
# Remove the entry (with no attributes) from the table.
723+
# Verify that the update is received and a callback is called
724+
config_db.set_entry(table_name_3, test_key, None)
725+
table_3_data = queue.get(timeout=5)
726+
assert test_key not in table_3_data
727+
finally:
728+
listener.terminate()
677729

678730

679731
def test_DBConnectFailure():

0 commit comments

Comments
 (0)