Skip to content

Commit d89f6ed

Browse files
authored
Elasticsearch: base tests (#1109)
* base tests for ElasticsearchIODataset * add docker dependency in ci workflow * modify year in copyright * parametrized attrs
1 parent ae1d601 commit d89f6ed

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed

tests/test_elasticsearch_eager.py

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4+
# use this file except in compliance with the License. You may obtain a copy of
5+
# the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations under
13+
# the License.
14+
# ==============================================================================
15+
16+
"""Tests for the elasticsearch datasets"""
17+
18+
import time
19+
import json
20+
import pytest
21+
import socket
22+
import requests
23+
import tensorflow as tf
24+
import tensorflow_io as tfio
25+
26+
# COMMON VARIABLES
27+
28+
ES_CONTAINER_NAME = "tfio-elasticsearch"
29+
NODE = "http://localhost:9200"
30+
INDEX = "people"
31+
DOC_TYPE = "survivors"
32+
HEADERS = {"Content-Type": "application/json"}
33+
ATTRS = ["name", "gender", "age", "fare", "survived"]
34+
35+
36+
def is_container_running():
37+
"""Check whether the elasticsearch container is up and running
38+
with the correct port being exposed.
39+
"""
40+
41+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
42+
status = sock.connect_ex(("127.0.0.1", 9200))
43+
if status == 0:
44+
return True
45+
else:
46+
return False
47+
48+
49+
@pytest.mark.skipif(not is_container_running(), reason="The container is not running")
50+
def test_create_index():
51+
"""Create an index in the cluster"""
52+
53+
create_index_url = "{}/{}".format(NODE, INDEX)
54+
res = requests.put(create_index_url)
55+
assert res.status_code == 200
56+
57+
58+
@pytest.mark.parametrize(
59+
"record",
60+
[
61+
(("person1", "Male", 20, 80.52, 1)),
62+
(("person2", "Female", 30, 40.88, 0)),
63+
(("person3", "Male", 40, 20.73, 0)),
64+
(("person4", "Female", 50, 100.99, 1)),
65+
],
66+
)
67+
@pytest.mark.skipif(not is_container_running(), reason="The container is not running")
68+
def test_populate_data(record):
69+
"""Populate the index with data"""
70+
71+
put_item_url = "{}/{}/{}".format(NODE, INDEX, DOC_TYPE)
72+
data = {}
73+
for idx, attr in enumerate(ATTRS):
74+
data[attr] = record[idx]
75+
76+
res = requests.post(put_item_url, json=data, headers=HEADERS)
77+
78+
# The 201 status code indicates the documents have been properly indexed
79+
assert res.status_code == 201
80+
81+
# allow the cluster to index in the background.
82+
time.sleep(1)
83+
84+
85+
@pytest.mark.skipif(not is_container_running(), reason="The container is not running")
86+
def test_elasticsearch_io_dataset():
87+
"""Test the functionality of the ElasticsearchIODataset"""
88+
89+
dataset = tfio.experimental.elasticsearch.ElasticsearchIODataset(
90+
nodes=[NODE], index=INDEX, doc_type=DOC_TYPE
91+
)
92+
93+
assert issubclass(type(dataset), tf.data.Dataset)
94+
95+
for item in dataset:
96+
for attr in ATTRS:
97+
assert attr in item
98+
99+
100+
@pytest.mark.skipif(not is_container_running(), reason="The container is not running")
101+
def test_elasticsearch_io_dataset_batch():
102+
"""Test the functionality of the ElasticsearchIODataset"""
103+
104+
BATCH_SIZE = 2
105+
dataset = tfio.experimental.elasticsearch.ElasticsearchIODataset(
106+
nodes=[NODE], index=INDEX, doc_type=DOC_TYPE
107+
).batch(BATCH_SIZE)
108+
109+
assert issubclass(type(dataset), tf.data.Dataset)
110+
111+
for item in dataset:
112+
for attr in ATTRS:
113+
assert attr in item
114+
assert len(item[attr]) == BATCH_SIZE
115+
116+
117+
@pytest.mark.skipif(not is_container_running(), reason="The container is not running")
118+
def test_cleanup():
119+
"""Clean up the index"""
120+
121+
delete_index_url = "{}/{}".format(NODE, INDEX)
122+
res = requests.delete(delete_index_url)
123+
assert res.status_code == 200

0 commit comments

Comments
 (0)