diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index c832f625f..570dbce91 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest +import json -import os +import unittest import time from unittest.mock import Mock, call -from kubernetes import client,config +from kubernetes import client from .watch import Watch @@ -497,7 +497,7 @@ def test_watch_with_error_event_and_timeout_param(self): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() - + @classmethod def setUpClass(cls): cls.api = Mock() @@ -505,7 +505,7 @@ def setUpClass(cls): def test_pod_log_empty_lines(self): pod_name = "demo-bug" - + try: self.api.create_namespaced_pod = Mock() self.api.read_namespaced_pod = Mock() @@ -514,12 +514,12 @@ def test_pod_log_empty_lines(self): #pod creating step self.api.create_namespaced_pod.return_value = None - + #Checking pod status mock_pod = Mock() mock_pod.status.phase = "Running" self.api.read_namespaced_pod.return_value = mock_pod - + # Printing at pod output self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) @@ -553,7 +553,7 @@ def test_pod_log_empty_lines(self): print(event) # Print outputs - print(f"Captured logs: {log_output}") + print(f"Captured logs: {log_output}") # self.assertTrue(any("Hello from Docker" in line for line in log_output)) # self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") expected_log = [ @@ -566,7 +566,7 @@ def test_pod_log_empty_lines(self): "\n", "Final log" ] - + self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs") except ApiException as e: @@ -576,44 +576,44 @@ def test_pod_log_empty_lines(self): self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) -if __name__ == '__main__': -def test_watch_with_deserialize_param(self): - """test watch.stream() deserialize param""" - # prepare test data - test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' - fake_resp = Mock() - fake_resp.close = Mock() - fake_resp.release_conn = Mock() - fake_resp.stream = Mock(return_value=[test_json + '\n']) - - fake_api = Mock() - fake_api.get_namespaces = Mock(return_value=fake_resp) - fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' - - # test case with deserialize=True - w = Watch() - for e in w.stream(fake_api.get_namespaces, deserialize=True): - self.assertEqual("ADDED", e['type']) - # Verify that the object is deserialized correctly - self.assertTrue(hasattr(e['object'], 'metadata')) - self.assertEqual("test1", e['object'].metadata.name) - self.assertEqual("1", e['object'].metadata.resource_version) - # Verify that the original object is saved - self.assertEqual(json.loads(test_json)['object'], e['raw_object']) - - # test case with deserialize=False - w = Watch() - for e in w.stream(fake_api.get_namespaces, deserialize=False): - self.assertEqual("ADDED", e['type']) - # The validation object remains in the original dictionary format - self.assertIsInstance(e['object'], dict) - self.assertEqual("test1", e['object']['metadata']['name']) - self.assertEqual("1", e['object']['metadata']['resourceVersion']) - - # verify the api is called twice - fake_api.get_namespaces.assert_has_calls([ - call(_preload_content=False, watch=True), - call(_preload_content=False, watch=True) - ]) + def test_watch_with_deserialize_param(self): + """test watch.stream() deserialize param""" + # prepare test data + test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}' + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.stream = Mock(return_value=[test_json + '\n']) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # test case with deserialize=True + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=True): + self.assertEqual("ADDED", e['type']) + # Verify that the object is deserialized correctly + self.assertTrue(hasattr(e['object'], 'metadata')) + self.assertEqual("test1", e['object'].metadata.name) + self.assertEqual("1", e['object'].metadata.resource_version) + # Verify that the original object is saved + self.assertEqual(json.loads(test_json)['object'], e['raw_object']) + + # test case with deserialize=False + w = Watch() + for e in w.stream(fake_api.get_namespaces, deserialize=False): + self.assertEqual("ADDED", e['type']) + # The validation object remains in the original dictionary format + self.assertIsInstance(e['object'], dict) + self.assertEqual("test1", e['object']['metadata']['name']) + self.assertEqual("1", e['object']['metadata']['resourceVersion']) + + # verify the api is called twice + fake_api.get_namespaces.assert_has_calls([ + call(_preload_content=False, watch=True), + call(_preload_content=False, watch=True) + ]) + if __name__ == '__main__': unittest.main() diff --git a/requirements.txt b/requirements.txt index bc6dc4dc0..ce7092ab3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,5 @@ websocket-client>=0.32.0,!=0.40.0,!=0.41.*,!=0.42.* # LGPLv2+ requests # Apache-2.0 requests-oauthlib # ISC oauthlib>=3.2.2 # BSD -urllib3>=1.24.2 # MIT +urllib3>=1.24.2,<2.4.0 # MIT durationpy>=0.7 # MIT