diff --git a/.env.examples b/.env.examples index 52414c8..5573eca 100644 --- a/.env.examples +++ b/.env.examples @@ -14,6 +14,9 @@ INFLUXDB_V2_ORG = "geoip2influx" INFLUXDB_V2_BUCKET = "geoip2influx" INFLUXDB_V2_RETENTION = "604800" # seconds (7 days) INFLUXDB_V2_DEBUG = "false" +INFLUXDB_V2_BATCHING = "true" +INFLUXDB_V2_BATCH_SIZE = "50" +INFLUXDB_V2_FLUSH_INTERVAL = "30_000" # milliseconds GEO_MEASUREMENT = "geoip2influx" LOG_MEASUREMENT = "nginx_access_logs" diff --git a/README.md b/README.md index 8b8cbc7..1f8168c 100644 --- a/README.md +++ b/README.md @@ -51,12 +51,16 @@ Add the ones that differ on your system. | Environment Variable | Example Value | Description | | -------------------- | ------------- | ----------- | +| USE_INFLUX_V2 | true | Required if using InfluxDB2. Defaults to false | | INFLUXDB_V2_TOKEN | secret-token | Required | | INFLUXDB_V2_URL | http://localhost:8086 | Optional, defaults to http://localhost:8086 | | INFLUXDB_V2_ORG | geoip2influx | Optional, defaults to geoip2influx. Will be created if not exists. | | INFLUXDB_V2_BUCKET | geoip2influx | Optional, defaults to geoip2influx. Will be created if not exists. | | INFLUXDB_V2_RETENTION | 604800 | Optional, defaults to 604800. 7 days in seconds | -| INFLUXDB_V2_DEBUG | false | Optional, defaults to false | +| INFLUXDB_V2_DEBUG | false | Optional, defaults to false. Enables the debug mode for the influxdb-client package. | +| INFLUXDB_V2_BATCHING | true | Optional, defaults to false. Enables batch writing of data. | +| INFLUXDB_V2_BATCH_SIZE | 100 | Optional, defaults to 10. | +| INFLUXDB_V2_FLUSH_INTERVAL | 30000 | Optional, defaults to 15000. How often in milliseconds to write a batch | ### MaxMind Geolite2 @@ -70,7 +74,7 @@ Get your licence key here: https://www.maxmind.com/en/geolite2/signup #### Note: The Grafana dashboard currently only supports InfluxDB v1.8.x -The InfluxDB database/bucket will be created automatically with the name you choose. +The InfluxDB database/bucket and retention rules will be created automatically with the name you choose. ``` -e INFLUX_DATABASE=geoip2influx or -e INFLUXDB_V2_BUCKET=geoip2influx diff --git a/geoip2influx/influxv2.py b/geoip2influx/influxv2.py index 64a9aac..4c63cbf 100644 --- a/geoip2influx/influxv2.py +++ b/geoip2influx/influxv2.py @@ -9,12 +9,23 @@ from influxdb_client.client.organizations_api import OrganizationsApi from requests.exceptions import ConnectionError from influxdb_client.client.exceptions import InfluxDBError -from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions from .influx_base import InfluxBase logger: Logger = logging.getLogger(__name__) +class BatchingCallback: + + def success(self, conf: tuple[str, str, str], data: str) -> None: + logger.debug("Written batch: %s, data: %s", conf, data) + + def error(self, conf: tuple[str, str, str], data: str, exception: InfluxDBError) -> None: + logger.error("Cannot write batch: %s, data: %s due: %s", conf, data, exception) + + def retry(self, conf: tuple[str, str, str], data: str, exception: InfluxDBError) -> None: + logger.warning("Retryable error occured for batch: %s, data: %s retry: %s", conf, data, exception) + class InfluxClient(InfluxBase): def __init__(self, auto_init: bool = True) -> None: """Initialize the InfluxDBClient. @@ -37,6 +48,9 @@ def __init__(self, auto_init: bool = True) -> None: - INFLUXDB_V2_BUCKET - INFLUX_V2_RETENTION - INFLUXDB_V2_DEBUG + - INFLUXDB_V2_BATCHING + - INFLUXDB_V2_BATCH_SIZE + - INFLUXDB_V2_FLUSH_INTERVAL Args: auto_init (bool, optional): Whether to automatically setup the InfluxDB client. Defaults to True. @@ -51,6 +65,9 @@ def __init__(self, auto_init: bool = True) -> None: self.org: str = os.getenv("INFLUXDB_V2_ORG", "geoip2influx") self.version: str|None = None self._setup_complete: bool = False + batching: bool = os.getenv("INFLUXDB_V2_BATCHING", "false").lower() == "true" + batch_size: int = int(os.getenv("INFLUXDB_V2_BATCH_SIZE", "10")) + flush_interval: int = int(os.getenv("INFLUXDB_V2_FLUSH_INTERVAL", "15000")) self.influx: InfluxDBClient | None = self.create_influx_client(debug=self.debug) @@ -60,8 +77,22 @@ def __init__(self, auto_init: bool = True) -> None: self.logger.debug("InfluxDB token: %s", self.influx.token) self.logger.debug("InfluxDB bucket: %s", self.bucket) self.logger.debug("InfluxDB bucket retention seconds: %s", self.retention) - - self.write_api: WriteApi = self.influx.write_api(write_options=SYNCHRONOUS) + self.logger.debug("InfluxDB batching enabled: %s", batching) + + if batching: + self.logger.debug("InfluxDB batch size: %s", batch_size) + self.logger.debug("InfluxDB flush interval: %s", flush_interval) + callback = BatchingCallback() + write_options: WriteOptions = WriteOptions(batch_size=batch_size, flush_interval=flush_interval) + self.write_api: WriteApi = self.influx.write_api( + write_options=write_options, + success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry + ) + else: + write_options = SYNCHRONOUS + self.write_api: WriteApi = self.influx.write_api(write_options=write_options) self.bucket_api: BucketsApi = self.influx.buckets_api() self.org_api: OrganizationsApi = self.influx.organizations_api() diff --git a/run.py b/run.py index 0209262..cc6f3b1 100644 --- a/run.py +++ b/run.py @@ -11,6 +11,10 @@ def handle_sigterm(signum, frame): logger = logging.getLogger("g2i") logger.info("Received SIGTERM. Exiting GeoIP2Influx.") + try: + parser.client.influx.close() + except Exception: + logger.exception("Error closing InfluxDB client.") logger.info("Parsed %d log line(s).", parser.parsed_lines) exit(0)