|
| 1 | +From 53901aba9ead82be21f1408a601b6266dcf1e3e4 Mon Sep 17 00:00:00 2001 |
| 2 | +From: macikgozwa < [email protected]> |
| 3 | +Date: Mon, 9 Nov 2020 16:19:24 -0800 |
| 4 | +Subject: [PATCH 2/5] Adding support for subscribe mode (#1) |
| 5 | + |
| 6 | +- Adding support for subscribe mode. The code is mostly based on this patch: https://github.com/google/gnxi/pull/65 |
| 7 | +- Adding a new parameter to limit the number of updates, e.g. after a number of streaming updates the client would stop listening. It is convenient for testing purposes. |
| 8 | +- Changing the sample interval unit to millisecond. This is also required for testing cases. |
| 9 | + |
| 10 | +Co-authored-by: Murat Acikgoz < [email protected]> |
| 11 | +--- |
| 12 | + gnmi_cli_py/py_gnmicli.py | 102 +++++++++++++++++++++++++++++++++++--- |
| 13 | + 1 file changed, 95 insertions(+), 7 deletions(-) |
| 14 | + |
| 15 | +diff --git a/gnmi_cli_py/py_gnmicli.py b/gnmi_cli_py/py_gnmicli.py |
| 16 | +index 062dee7..7152f13 100644 |
| 17 | +--- a/gnmi_cli_py/py_gnmicli.py |
| 18 | ++++ b/gnmi_cli_py/py_gnmicli.py |
| 19 | +@@ -24,9 +24,7 @@ Current supported gNMI features: |
| 20 | + - Auto-loads Target cert from Target if not specified |
| 21 | + - User/password based authentication |
| 22 | + - Certifificate based authentication |
| 23 | +- |
| 24 | +-Current unsupported gNMI features: |
| 25 | +-- Subscribe |
| 26 | ++- Subscribe request |
| 27 | + """ |
| 28 | + |
| 29 | + from __future__ import absolute_import |
| 30 | +@@ -40,14 +38,16 @@ import re |
| 31 | + import ssl |
| 32 | + import sys |
| 33 | + import six |
| 34 | ++import datetime |
| 35 | + try: |
| 36 | + import gnmi_pb2 |
| 37 | + except ImportError: |
| 38 | + print('ERROR: Ensure you\'ve installed dependencies from requirements.txt\n' |
| 39 | + 'eg, pip install -r requirements.txt') |
| 40 | + import gnmi_pb2_grpc |
| 41 | ++import grpc |
| 42 | + |
| 43 | +-__version__ = '0.4' |
| 44 | ++__version__ = '0.5' |
| 45 | + |
| 46 | + _RE_PATH_COMPONENT = re.compile(r''' |
| 47 | + ^ |
| 48 | +@@ -143,6 +143,21 @@ def _create_parser(): |
| 49 | + required=False, action='store_true') |
| 50 | + parser.add_argument('-n', '--notls', help='gRPC insecure mode', |
| 51 | + required=False, action='store_true') |
| 52 | ++ parser.add_argument('--interval', default=10000, type=int, |
| 53 | ++ help='sample interval in millisecond (default: 10000ms)') |
| 54 | ++ parser.add_argument('--timeout', type=int, help='subscription' |
| 55 | ++ 'duration in seconds (default: none)') |
| 56 | ++ parser.add_argument('--heartbeat', default=0, type=int, help='heartbeat interval (default: None)') |
| 57 | ++ parser.add_argument('--aggregate', action='store_true', help='allow aggregation') |
| 58 | ++ parser.add_argument('--suppress', action='store_true', help='suppress redundant') |
| 59 | ++ parser.add_argument('--submode', default=2, type=int, |
| 60 | ++ help='subscription mode [0=TARGET_DEFINED, 1=ON_CHANGE, 2=SAMPLE]') |
| 61 | ++ parser.add_argument('--update_count', default=0, type=int, help='Max number of streaming updates to receive. 0 means no limit.') |
| 62 | ++ parser.add_argument('--subscribe_mode', default=0, type=int, help='[0=STREAM, 1=ONCE, 2=POLL]') |
| 63 | ++ parser.add_argument('--encoding', default=0, type=int, help='[0=JSON, 1=BYTES, 2=PROTO, 3=ASCII, 4=JSON_IETF]') |
| 64 | ++ parser.add_argument('--qos', default=0, type=int, help='') |
| 65 | ++ parser.add_argument('--use_alias', action='store_true', help='use alias') |
| 66 | ++ parser.add_argument('--prefix', default='', help='gRPC path prefix (default: none)') |
| 67 | + return parser |
| 68 | + |
| 69 | + |
| 70 | +@@ -353,6 +368,79 @@ def _open_certs(**kwargs): |
| 71 | + return kwargs |
| 72 | + |
| 73 | + |
| 74 | ++def gen_request(paths, opt, prefix): |
| 75 | ++ """Create subscribe request for passed xpath. |
| 76 | ++ Args: |
| 77 | ++ paths: (str) gNMI path. |
| 78 | ++ opt: (dict) Command line argument passed for subscribe reqeust. |
| 79 | ++ Returns: |
| 80 | ++ gNMI SubscribeRequest object. |
| 81 | ++ """ |
| 82 | ++ mysubs = [] |
| 83 | ++ mysub = gnmi_pb2.Subscription(path=paths, mode=opt["submode"], |
| 84 | ++ sample_interval=opt["interval"]*1000000, |
| 85 | ++ heartbeat_interval=opt['heartbeat']*1000000, |
| 86 | ++ suppress_redundant=opt['suppress']) |
| 87 | ++ mysubs.append(mysub) |
| 88 | ++ |
| 89 | ++ if prefix: |
| 90 | ++ myprefix = prefix |
| 91 | ++ elif opt["prefix"]: |
| 92 | ++ myprefix = _parse_path(_path_names(opt["prefix"])) |
| 93 | ++ else: |
| 94 | ++ myprefix = None |
| 95 | ++ |
| 96 | ++ if opt["qos"]: |
| 97 | ++ myqos = gnmi_pb2.QOSMarking(marking=opt["qos"]) |
| 98 | ++ else: |
| 99 | ++ myqos = None |
| 100 | ++ mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=opt['subscribe_mode'], |
| 101 | ++ allow_aggregation=opt['aggregate'], encoding=opt['encoding'], |
| 102 | ++ subscription=mysubs, use_aliases=opt['use_alias'], qos=myqos) |
| 103 | ++ mysubreq = gnmi_pb2.SubscribeRequest(subscribe=mysblist) |
| 104 | ++ |
| 105 | ++ print('Sending SubscribeRequest\n'+str(mysubreq)) |
| 106 | ++ yield mysubreq |
| 107 | ++ |
| 108 | ++ |
| 109 | ++def subscribe_start(stub, options, req_iterator): |
| 110 | ++ """ RPC Start for Subscribe reqeust |
| 111 | ++ Args: |
| 112 | ++ stub: (class) gNMI Stub used to build the secure channel. |
| 113 | ++ options: (dict) Command line argument passed for subscribe reqeust. |
| 114 | ++ req_iterator: gNMI Subscribe Request from gen_request. |
| 115 | ++ Returns: |
| 116 | ++ Start Subscribe and printing response of gNMI Subscribe Response. |
| 117 | ++ """ |
| 118 | ++ metadata = [('username', options['username']), ('password', options['password'])] |
| 119 | ++ max_update_count = options["update_count"] |
| 120 | ++ try: |
| 121 | ++ responses = stub.Subscribe(req_iterator, options['timeout'], metadata=metadata) |
| 122 | ++ update_count = 0 |
| 123 | ++ for response in responses: |
| 124 | ++ print('{0} response received: '.format(datetime.datetime.now())) |
| 125 | ++ if response.HasField('sync_response'): |
| 126 | ++ print(str(response)) |
| 127 | ++ elif response.HasField('error'): |
| 128 | ++ print('gNMI Error '+str(response.error.code)+\ |
| 129 | ++ ' received\n'+str(response.error.message) + str(response.error)) |
| 130 | ++ elif response.HasField('update'): |
| 131 | ++ print(response) |
| 132 | ++ update_count = update_count+1 |
| 133 | ++ else: |
| 134 | ++ print('Unknown response received:\n'+str(response)) |
| 135 | ++ |
| 136 | ++ if max_update_count != 0 and update_count == max_update_count: |
| 137 | ++ print("Max update count reached {0}".format(update_count)) |
| 138 | ++ break |
| 139 | ++ except KeyboardInterrupt: |
| 140 | ++ print("Subscribe Session stopped by user.") |
| 141 | ++ except grpc.RpcError as x: |
| 142 | ++ print("grpc.RpcError received:\n%s" %x) |
| 143 | ++ except Exception as err: |
| 144 | ++ print(err) |
| 145 | ++ |
| 146 | ++ |
| 147 | + def main(): |
| 148 | + argparser = _create_parser() |
| 149 | + args = vars(argparser.parse_args()) |
| 150 | +@@ -414,9 +502,9 @@ def main(): |
| 151 | + response = _set(stub, paths, 'delete', user, password, json_value) |
| 152 | + print('The SetRequest response is below\n' + '-'*25 + '\n', response) |
| 153 | + elif mode == 'subscribe': |
| 154 | +- print('This mode not available in this version') |
| 155 | +- sys.exit() |
| 156 | ++ request_iterator = gen_request(paths, args, prefix) |
| 157 | ++ subscribe_start(stub, args, request_iterator) |
| 158 | + |
| 159 | + |
| 160 | + if __name__ == '__main__': |
| 161 | +- main() |
| 162 | ++ main() |
| 163 | +\ No newline at end of file |
| 164 | +-- |
| 165 | +2.48.1.windows.1 |
| 166 | + |
0 commit comments