Сервис занимаеться обогощением данных, получаемых по средствам
gRPC запросов.
# чтобы запустить файл
go run cmd/des/des.go
# чтобы собрать исполняемый файл
go build cmd/des/des.go
# запуск исполняемого файла
./des
package config
type AppConfig struct {
WorkerConfig WorkerConfig `json:"worker"`
ServiceConfig ServiceConfig `json:"service"`
ChannelSize int `json:"queue_task_size"`
CacheClearInterval int `json:"cache_clear_interval"`
}
type ServiceConfig struct {
IP string `json:"IP"`
PORT string `json:"PORT"`
}
type WorkerConfig struct {
RemoteHTTPServer struct {
IP string `json:"IP"`
PORT string `json:"PORT"`
} `json:"remote_http_server"`
Authentication struct {
Login string `json:"login"`
Password string `json:"password"`
} `json:"authentication"`
MaxWorkers int `json:"max_workers"`
MaxTimeForResponse int `json:"max_time_for_response"`
TimeoutConnection int `json:"timeout_connection"`
}
AppConfig
: Основная структура, содержащая все параметры конфигурации.
ServiceConfig
: Подструктура, описывающая параметры конфигурации сервиса.
WorkerConfig
: Подструктура, описывающая параметры конфигурации рабочего процесса, включая параметры для удаленного HTTP-сервера и аутентификации.
-path-to-config
: Флаг для указания пути к файлу конфигурации. Если флаг не указан, будет использован путь по умолчанию (config/app.conf).
- Конфигурация загружается из файла JSON. Убедитесь, что файл конфигурации существует и имеет правильный формат.
{
"channel_size":20,
"cache_clear_interval":20,
"service": {
"IP": "127.0.0.1",
"PORT": ":8080"
},
"worker": {
"_comments": {
"description": "this is a configuration for worker pull"
},
"max_workers": 6,
"timeout_connection": 0,
"max_time_for_response": 2,
"remote_http_server": {
"_comments": {
"description": "this is a remote http server config"
},
"IP": "127.0.0.1",
"PORT": "4557"
},
"authentication": {
"_comments": {
"description": "this is a data for authentication"
},
"login": "admin",
"password": "admin"
}
}
channel_size
- максимальный размер очереди задачcache_clear_interval
- интервал очистки кэшаservice
- адрес и порт, на которых работает сервисworker
-max_workers
- максимальное количесвто параллельно работающих воркеровtimeout_connection
- таймаут подключения к удаленному HTTP(0 - keep alive соединение)max_time_for_responce
- максимальное время для ответа воркера, если время выйдет, будет отменён контекстremote_http_server
- адресс и порт сервера при помощик которого, будут обогощаться данныеauthentication
- данные для аунтефикации на удаленном HTTP (передаються в заголовкеAuthorization
в base64)
Этот пакет предоставляет простой и гибкий механизм логгирования на основе библиотеки Zap.
-
Консольный вывод: Логгер выводит сообщения на консоль с использованием консольного кодировщика для более читаемого вывода.
-
Файловый вывод: Логгер также записывает сообщения в файлы для отслеживания деталей работы приложения. Различаются логи по уровням:
debug.log
для отладочных сообщений,warning_error.log
для предупреждений и ошибок, а такжеhttp.log
для отслеживания всех запросов к стороннему HTTP иgrpc.log
для отслеживания запросов по grpc. -
Уровни логгирования: Логгер настроен для разделения сообщений на уровни отладки, информации, предупреждений и ошибок.
Этот пакет предоставляет стандартный способ создания и настройки логгеров в вашем приложении, обеспечивая удобство использования и гибкость в конфигурации.
Этот пакет предоставляет простой механизм кэширования данных с автоматической очисткой по истечении указанного времени.
-
Создание кэша: Используйте функцию
NewCache
для создания нового экземпляра кэша:c := cache.NewCache(ctx, duration)
ctx
- контекст в случае отмены которого, будет завершенв очистка кэша.durtion
- интервал очистки кэша
-
Загрузка данных в кэш: Используйте метод
Load
для добавления данных в кэш:c.Load(title, data)
-
Поиск данных в кэше: Используйте метод
Search
для поиска данных в кэше:data, ok := c.Search(title)
-
Автоматическая очистка: Кэш автоматически очищается от данных по истечении указанного времени с момента последней загрузки(
duration
). Время указывается при создании кэша.
Давайте разберем каждую функцию и структуру в вашем коде:
- структура
ExtServer
:
type ExtServer struct {
pb.UnimplementedUserExtensionServiceServer
logger *zap.Logger
grpcLogger *zap.Logger
cache Cacher
}
-
logger
: Экземпляр логгера из библиотекиgo.uber.org/zap
для логирования общих событий сервиса. -
grpcLogger
: Еще один экземпляр логгера для логирования событий gRPC, таких как начало и завершение соединения, а также информации о запросах. -
cache
: ИнтерфейсCacher
, который представляет кэш для хранения данных. Этот интерфейс предоставляет два метода:Load
для добавления данных в кэш иSearch
для поиска данных по ключу.
- Интерфейс
Cacher
:
type Cacher interface {
Load(title string, data any)
Search(title string) (data any, ok bool)
}
Интерфейс призван уменьшить связность пакетов и отвязать сервис от конеретной реализации кэширования.
3. функция-конструктор NewExtServer
:
func NewExtServer(channel chan chan []byte, logger, grpcLogger *zap.Logger, cache Cacher) *ExtServer
-
Создает новый экземпляр
ExtServer
. -
Принимает канал
channel
для взаимодействия с воркерами и логгеры для логирования. -
Возвращает указатель на новый экземпляр
ExtServer
.
- метод-обработчик
GetUserExtension
:
func (es *ExtServer) GetUserExtension(ctx context.Context, in *pb.GetRequest) (out *pb.GetResponse, err error)
-
Метод, удовлетворяющий интерфейсу
UserExtensionServiceServer
сгенерированного gRPC кода. -
Получает запрос от клиента и отправляет его в канал
ch
для воркеров. Если канал занят, возвращает ошибкуResourceExhausted
. -
Ожидает ответ от воркеров в виде JSON-подобной строки, затем десериализует и возвращает результат. Загружает результат в кэш.
- метод-перехватчик
LogUnaryRPCInterceptor
:
func (es *ExtServer) LogUnaryRPCInterceptor(ctx context.Context, req interface{},
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)
-
Перехватчик для логирования информации о каждом gRPC вызове.
-
Измеряет время выполнения вызова и логирует различную информацию о вызове, включая наличие данных в кэше.
-
Если данные найдены в кэше, возвращает их сразу, иначе передает выполнение обработчику.
- метод
StartServer
:
func (es *ExtServer) StartServer(addr, port string) (*grpc.Server, error)
-
Метод для запуска gRPC сервера.
-
Создает слушатель на указанном адресе и порту.
-
Создает новый экземпляр gRPC сервера с указанными опциями и регистрирует его методы.
-
Запускает сервер в горутине.
-
Возвращает указатель на сервер и ошибку (если есть).
Этот код представляет собой реализацию gRPC-сервиса, взаимодействующего с воркерами через канал и использующего логгирование с использованием библиотеки zap.
syntax = "proto3";
package service;
option go_package = "github.com/EgorKo25/DES/internal/server";
service UserExtensionService {
rpc GetUserExtension(GetRequest) returns (GetResponse);
}
message GetRequest {
UserData user_data = 1;
}
message GetResponse {
string status = 1;
UserData users = 2;
}
message UserData {
int32 ids = 1;
string name = 2;
string email = 3;
string phone_number = 4;
string date_to = 5;
string date_from = 6;
}
-
Функция-конструктор
NewWorkerPull
:func NewWorkerPull(ctx context.Context, channel chan chan []byte, maxWorkers, timeOutConn, maxResponseTime int, login, password string, logger, htpLogger *zap.Logger) *WorkerPull
- Инициализирует и возвращает новый экземпляр
WorkerPull
. - Запускает указанное количество воркеров в горутинах.
- Устанавливает контекст, таймаут подключения, логин, пароль и другие параметры.
- Проверяет, что контекст не содержит ошибку.
- Инициализирует и возвращает новый экземпляр
-
Структура
WorkerPull
:
type WorkerPull struct {
*Auth
channel chan chan []byte
logger *zap.Logger
httpLogger *zap.Logger
client *http.Client
urlExtData string
urlStatusAbv string
workerPullSize int
maxResponseTime int
}
- Содержит информацию о воркере:
- аутентификацию
(*Auth)
- логгеры
logger, httpLogger
- клиент HTTP
client
- адреса обработчиков удаленного HTTP
urlExtData, urlStatusAbv
- Максимальное количесвто воркеров
workerPullSize
- Максимально время для ответа в канал
maxResponseTime
- аутентификацию
- Метод
worker
- выполняет обработку запросов из канала. Обрабатывает полученные данные, отправляет запросы и возвращает результаты.
Auth
структура:
type Auth struct {
login string
password string
}
- Хранит логин и пароль для аутентификации.
- Методы
worker
иprocessRequest
:
worker
: Обрабатывает данные из канала, отправляет запросы и возвращает результаты.
func (wp *WorkerPull) worker(ctx context.Context)
processRequest
: Отправляет HTTP-запрос с предоставленными данными и возвращает тело ответа.
func (wp *WorkerPull) processRequest(ctx context.Context, url string, data *fastjson.Value, body []byte) ([]byte, error)
setSmile
функция:
func (wp *WorkerPull) setSmile(reasonId int) string
- Возвращает эмоджи в зависимости от переданного
reasonId
.
getAuthorization
функция:
func (wp *WorkerPull) getAuthorization() string
- Возвращает строку для заголовка авторизации, созданную на основе логина и пароля.
-
Логирование:
- Используется пакет
go.uber.org/zap
для логирования. - Различные логи, такие как информация о начале работы воркера, отправке запросов и получении ответов, сохранены для отслеживания действий приложения.
- Используется пакет
-
Использование emoji:
- Используется пакет
github.com/enescakir/emoji
для добавления эмоджи в строку в зависимости отreasonId
.
- Используется пакет
Этот код представляет собой воркер-пул для обработки запросов. Каждый воркер получает данные из канала, отправляет запросы на указанные URL-ы, обрабатывает ответы и возвращает результаты.
- Добавьте файл
systemd/des.service
в/etc/systemd/system/
- Добавьте папку
systemd/DES
в/home
sudo systemctl start des.service
sudo systemctl stop des.service
sudo systemctl enable des.service