Skip to content

Commit

Permalink
Add batching support
Browse files Browse the repository at this point in the history
  • Loading branch information
GilbN committed Aug 15, 2024
1 parent 439b31f commit 36e58c6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .env.examples
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ INFLUXDB_V2_ORG = "geoip2influx"
INFLUXDB_V2_BUCKET = "geoip2influx"
INFLUXDB_V2_RETENTION = "604800" # seconds (7 days)
INFLUXDB_V2_DEBUG = "false"
INFLUXDB_V2_BATCHING = "true"
INFLUXDB_V2_BATCH_SIZE = "50"
INFLUXDB_V2_FLUSH_INTERVAL = "30_000" # milliseconds

GEO_MEASUREMENT = "geoip2influx"
LOG_MEASUREMENT = "nginx_access_logs"
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ Add the ones that differ on your system.

| Environment Variable | Example Value | Description |
| -------------------- | ------------- | ----------- |
| USE_INFLUX_V2 | true | Required if using InfluxDB2. Defaults to false |
| INFLUXDB_V2_TOKEN | secret-token | Required |
| INFLUXDB_V2_URL | http://localhost:8086 | Optional, defaults to http://localhost:8086 |
| INFLUXDB_V2_ORG | geoip2influx | Optional, defaults to geoip2influx. Will be created if not exists. |
| INFLUXDB_V2_BUCKET | geoip2influx | Optional, defaults to geoip2influx. Will be created if not exists. |
| INFLUXDB_V2_RETENTION | 604800 | Optional, defaults to 604800. 7 days in seconds |
| INFLUXDB_V2_DEBUG | false | Optional, defaults to false |
| INFLUXDB_V2_DEBUG | false | Optional, defaults to false. Enables the debug mode for the influxdb-client package. |
| INFLUXDB_V2_BATCHING | true | Optional, defaults to false. Enables batch writing of data. |
| INFLUXDB_V2_BATCH_SIZE | 100 | Optional, defaults to 10. |
| INFLUXDB_V2_FLUSH_INTERVAL | 30000 | Optional, defaults to 15000. How often in milliseconds to write a batch |

### MaxMind Geolite2

Expand All @@ -70,7 +74,7 @@ Get your licence key here: https://www.maxmind.com/en/geolite2/signup

#### Note: The Grafana dashboard currently only supports InfluxDB v1.8.x

The InfluxDB database/bucket will be created automatically with the name you choose.
The InfluxDB database/bucket and retention rules will be created automatically with the name you choose.

```
-e INFLUX_DATABASE=geoip2influx or -e INFLUXDB_V2_BUCKET=geoip2influx
Expand Down
37 changes: 34 additions & 3 deletions geoip2influx/influxv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,23 @@
from influxdb_client.client.organizations_api import OrganizationsApi
from requests.exceptions import ConnectionError
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions

from .influx_base import InfluxBase

logger: Logger = logging.getLogger(__name__)

class BatchingCallback:

def success(self, conf: tuple[str, str, str], data: str) -> None:
logger.debug("Written batch: %s, data: %s", conf, data)

def error(self, conf: tuple[str, str, str], data: str, exception: InfluxDBError) -> None:
logger.error("Cannot write batch: %s, data: %s due: %s", conf, data, exception)

def retry(self, conf: tuple[str, str, str], data: str, exception: InfluxDBError) -> None:
logger.warning("Retryable error occured for batch: %s, data: %s retry: %s", conf, data, exception)

class InfluxClient(InfluxBase):
def __init__(self, auto_init: bool = True) -> None:
"""Initialize the InfluxDBClient.
Expand All @@ -37,6 +48,9 @@ def __init__(self, auto_init: bool = True) -> None:
- INFLUXDB_V2_BUCKET
- INFLUX_V2_RETENTION
- INFLUXDB_V2_DEBUG
- INFLUXDB_V2_BATCHING
- INFLUXDB_V2_BATCH_SIZE
- INFLUXDB_V2_FLUSH_INTERVAL
Args:
auto_init (bool, optional): Whether to automatically setup the InfluxDB client. Defaults to True.
Expand All @@ -51,6 +65,9 @@ def __init__(self, auto_init: bool = True) -> None:
self.org: str = os.getenv("INFLUXDB_V2_ORG", "geoip2influx")
self.version: str|None = None
self._setup_complete: bool = False
batching: bool = os.getenv("INFLUXDB_V2_BATCHING", "false").lower() == "true"
batch_size: int = int(os.getenv("INFLUXDB_V2_BATCH_SIZE", "10"))
flush_interval: int = int(os.getenv("INFLUXDB_V2_FLUSH_INTERVAL", "15000"))

self.influx: InfluxDBClient | None = self.create_influx_client(debug=self.debug)

Expand All @@ -60,8 +77,22 @@ def __init__(self, auto_init: bool = True) -> None:
self.logger.debug("InfluxDB token: %s", self.influx.token)
self.logger.debug("InfluxDB bucket: %s", self.bucket)
self.logger.debug("InfluxDB bucket retention seconds: %s", self.retention)

self.write_api: WriteApi = self.influx.write_api(write_options=SYNCHRONOUS)
self.logger.debug("InfluxDB batching enabled: %s", batching)

if batching:
self.logger.debug("InfluxDB batch size: %s", batch_size)
self.logger.debug("InfluxDB flush interval: %s", flush_interval)
callback = BatchingCallback()
write_options: WriteOptions = WriteOptions(batch_size=batch_size, flush_interval=flush_interval)
self.write_api: WriteApi = self.influx.write_api(
write_options=write_options,
success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry
)
else:
write_options = SYNCHRONOUS
self.write_api: WriteApi = self.influx.write_api(write_options=write_options)
self.bucket_api: BucketsApi = self.influx.buckets_api()
self.org_api: OrganizationsApi = self.influx.organizations_api()

Expand Down
4 changes: 4 additions & 0 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
def handle_sigterm(signum, frame):
logger = logging.getLogger("g2i")
logger.info("Received SIGTERM. Exiting GeoIP2Influx.")
try:
parser.client.influx.close()
except Exception:
logger.exception("Error closing InfluxDB client.")
logger.info("Parsed %d log line(s).", parser.parsed_lines)
exit(0)

Expand Down

0 comments on commit 36e58c6

Please sign in to comment.