From a47d1c6e9e48c1a2e53c8dee23014e384450c166 Mon Sep 17 00:00:00 2001 From: Pang Wu Date: Tue, 23 Jun 2015 11:46:58 -0700 Subject: [PATCH 1/3] WIP --- sql/sensors/tracker_motion/create.sql | 12 ++ .../pillstatus_worker.dev.yml.example | 101 ++++++++++++++++ .../pillstatus/pillstatus_worker.prod.yml | 110 +++++++++++++++++ .../pillstatus/pillstatus_worker.staging.yml | 112 ++++++++++++++++++ .../PillStatusWorkerConfiguration.java | 42 +++++++ 5 files changed, 377 insertions(+) create mode 100644 suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example create mode 100644 suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml create mode 100644 suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml create mode 100644 suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java diff --git a/sql/sensors/tracker_motion/create.sql b/sql/sensors/tracker_motion/create.sql index 7084ca294..3c6c00526 100644 --- a/sql/sensors/tracker_motion/create.sql +++ b/sql/sensors/tracker_motion/create.sql @@ -86,3 +86,15 @@ GRANT ALL PRIVILEGES ON pill_status_id_seq TO ingress_user; ALTER TABLE pill_status ADD COLUMN uptime BIGINT; ALTER TABLE pill_status ADD COLUMN fw_version INTEGER; + + +CREATE TABLE pill_battery_monitor ( + id BIGSERIAL PRIMARY KEY, + internal_pill_id BIGINT, + pill_id VARCHAR(255), + last_update TIMESTAMP, + pill_status INTEGER, + max_d2d_drop INTEGER +); +CREATE INDEX pill_battery_monitor_id_status on pill_battery_monitor(internal_pill_id, last_update); +CREATE UNIQUE INDEX pill_battery_monitor_id_unique on pill_battery_monitor(internal_pill_id); \ No newline at end of file diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example new file mode 100644 index 000000000..d10f05d90 --- /dev/null +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example @@ -0,0 +1,101 @@ +debug: true +metrics_enabled: false +graphite: + host: carbon.hostedgraphite.com + api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336 + reporting_interval_in_seconds: 30 + include_metrics: + - com.yammer + - com.hello + +sensors_db: + # the name of your JDBC driver + driverClass: org.postgresql.Driver + + # the username + user: your postgresql username + + # the password + password: your postgresql password + + # the JDBC URL + url: jdbc:postgresql://localhost:5432/[your postgresql username] + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + +common_db: + # the name of your JDBC driver + driverClass: org.postgresql.Driver + + # the username + user: your postgresql username + + # the password + password: hello ingress user + + # the JDBC URL + url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 8 + + # the maximum number of connections to keep open + maxSize: 32 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + + +kinesis: + endpoint : https://kinesis.us-east-1.amazonaws.com + streams : + batch_pill_data : batch_pill_data + +app_name: PillStatusWorkerDev + +max_records: 1000 + +# Logging settings. +logging: + + # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL. + level: INFO + + # Logger-specific levels. + loggers: + + # Sets the level for 'com.example.app' to DEBUG. + com.hello.suripu.workers: DEBUG + + +dynamodb: + region: us-east-1 + tables: + features: features + endpoints: + features : http://localhost:7777 + +kinesis_logger: + stream_name: dev_logs + enabled : false + buffer_size: 100 + origin: suripu-workers + min_log_level : INFO diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml new file mode 100644 index 000000000..c48c23be9 --- /dev/null +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml @@ -0,0 +1,110 @@ +debug: false +metrics_enabled: true +graphite: + host: carbon.hostedgraphite.com + api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336 + reporting_interval_in_seconds: 30 + include_metrics: + - com.yammer + - com.hello + +sensors_db: + driverClass: org.postgresql.Driver + user: sensors + password: hello-sensors + url: jdbc:postgresql://sensors-1-replica-1.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/sensors1 + properties: + hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 2 + + # the maximum number of connections to keep open + maxSize: 8 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + +common_db: + driverClass: org.postgresql.Driver + user: common + password: hello-common + url: jdbc:postgresql://common.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/common + properties: + hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 2 + + # the maximum number of connections to keep open + maxSize: 8 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + +kinesis: + endpoint : https://kinesis.us-east-1.amazonaws.com + streams : + batch_pill_data : batch_pill_data + +app_name: PillStatusWorkerProd + +max_records: 20 + +# Logging settings. +logging: + + # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL. + level: INFO + + # Logger-specific levels. + loggers: + + # Sets the level for 'com.example.app' to DEBUG. + com.hello.suripu.workers: DEBUG + +dynamodb: + region: us-east-1 + tables: + features: features + endpoints: + features : http://dynamodb.us-east-1.amazonaws.com + +kinesis_logger: + stream_name: logs + enabled : true + buffer_size: 100 + origin: suripu-workers + min_log_level : INFO diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml new file mode 100644 index 000000000..105a09a54 --- /dev/null +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml @@ -0,0 +1,112 @@ +debug: false +metrics_enabled: false +graphite: + host: carbon.hostedgraphite.com + api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336 + reporting_interval_in_seconds: 30 + include_metrics: + - com.yammer + - com.hello + +sensors_db: + driverClass: org.postgresql.Driver + user: ingress_user + password: hello ingress user + url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku + properties: + hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 2 + + # the maximum number of connections to keep open + maxSize: 8 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + +common_db: + driverClass: org.postgresql.Driver + user: ingress_user + password: hello ingress user + url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku + properties: + hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "/* MyService Health Check */ SELECT 1" + + # the minimum number of connections to keep open + minSize: 2 + + # the maximum number of connections to keep open + maxSize: 8 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: false + + # how long a connection must be held before it can be validated + checkConnectionHealthWhenIdleFor: 10s + + # the maximum lifetime of an idle connection + closeConnectionIfIdleFor: 1 minute + + +kinesis: + endpoint : https://kinesis.us-east-1.amazonaws.com + streams : + batch_pill_data : dev_batch_pill_data + +app_name: PillStatusWorker + +max_records: 20 + +# Logging settings. +logging: + + # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL. + level: INFO + + # Logger-specific levels. + loggers: + + # Sets the level for 'com.example.app' to DEBUG. + com.hello.suripu.workers: DEBUG + + +dynamodb: + region: us-east-1 + tables: + features: features + endpoints: + features : http://dynamodb.us-east-1.amazonaws.com + +kinesis_logger: + stream_name: dev_logs + enabled : false + buffer_size: 100 + origin: suripu-workers + min_log_level : INFO diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java new file mode 100644 index 000000000..d02ead123 --- /dev/null +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java @@ -0,0 +1,42 @@ +package com.hello.suripu.workers.pillstatus; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.hello.suripu.core.configuration.NewDynamoDBConfiguration; +import com.hello.suripu.workers.framework.WorkerConfiguration; +import com.yammer.dropwizard.db.DatabaseConfiguration; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +/** + * Created by pangwu on 6/22/15. + */ +public class PillStatusWorkerConfiguration extends WorkerConfiguration { + @Valid + @NotNull + @JsonProperty("dynamodb") + private NewDynamoDBConfiguration dynamoDBConfiguration; + + public NewDynamoDBConfiguration getDynamoDBConfiguration(){ + return dynamoDBConfiguration; + } + + + @Valid + @NotNull + @JsonProperty("sensors_db") + private DatabaseConfiguration sensorsDB = new DatabaseConfiguration(); + + public DatabaseConfiguration getSensorsDB() { + return sensorsDB; + } + + @Valid + @NotNull + @JsonProperty("common_db") + private DatabaseConfiguration commonDB = new DatabaseConfiguration(); + + public DatabaseConfiguration getCommonDB() { + return commonDB; + } +} From 83d56d3bca395a8c1dccb72af95b7c2bd84603b9 Mon Sep 17 00:00:00 2001 From: Pang Wu Date: Tue, 30 Jun 2015 15:31:58 -0700 Subject: [PATCH 2/3] WIP --- sql/sensors/tracker_motion/create.sql | 11 +- .../hello/suripu/api/tasks/TaskProtos.java | 910 ++++++++++++++++++ .../suripu/core/configuration/QueueName.java | 3 +- .../suripu/core/db/PillClassificationDAO.java | 34 + .../suripu/core/db/PillHeartBeatDAO.java | 9 + .../db/binders/BindPillClassification.java | 39 + .../db/mappers/PillClassificationMapper.java | 33 + .../core/models/PillClassification.java | 68 ++ .../hello/suripu/core/util/PillStatus.java | 32 + .../pillstatus_worker.dev.yml.example | 2 +- .../pillstatus/pillstatus_worker.prod.yml | 2 +- .../pillstatus/pillstatus_worker.staging.yml | 2 +- .../pillstatus/PillStatusRecordProcessor.java | 285 ++++++ .../PillStatusRecordProcessorFactory.java | 33 + .../pillstatus/PillStatusWorkerCommand.java | 90 ++ .../PillStatusWorkerConfiguration.java | 13 + 16 files changed, 1558 insertions(+), 8 deletions(-) create mode 100644 suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java create mode 100644 suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java create mode 100644 suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java create mode 100644 suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java create mode 100644 suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java create mode 100644 suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java create mode 100644 suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java create mode 100644 suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java create mode 100644 suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java diff --git a/sql/sensors/tracker_motion/create.sql b/sql/sensors/tracker_motion/create.sql index 3c6c00526..265dc46e6 100644 --- a/sql/sensors/tracker_motion/create.sql +++ b/sql/sensors/tracker_motion/create.sql @@ -88,13 +88,16 @@ ALTER TABLE pill_status ADD COLUMN uptime BIGINT; ALTER TABLE pill_status ADD COLUMN fw_version INTEGER; -CREATE TABLE pill_battery_monitor ( +CREATE TABLE pill_classification ( id BIGSERIAL PRIMARY KEY, internal_pill_id BIGINT, pill_id VARCHAR(255), - last_update TIMESTAMP, - pill_status INTEGER, - max_d2d_drop INTEGER + last_24pt_window_ts TIMESTAMP, + last_72pt_window_ts TIMESTAMP, + last_update_batt INTEGER, + max_24hr_diff INTEGER, + max_72hr_diff INTEGER, + class INTEGER ); CREATE INDEX pill_battery_monitor_id_status on pill_battery_monitor(internal_pill_id, last_update); CREATE UNIQUE INDEX pill_battery_monitor_id_unique on pill_battery_monitor(internal_pill_id); \ No newline at end of file diff --git a/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java b/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java new file mode 100644 index 000000000..52de331fe --- /dev/null +++ b/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java @@ -0,0 +1,910 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: worker_task.proto + +package com.hello.suripu.api.tasks; + +public final class TaskProtos { + private TaskProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface WorkerTaskOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional .WorkerTask.TaskType task_type = 1; + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + boolean hasTaskType(); + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + WorkerTask.TaskType getTaskType(); + + // repeated string pill_ids = 2; + /** + * repeated string pill_ids = 2; + */ + java.util.List + getPillIdsList(); + /** + * repeated string pill_ids = 2; + */ + int getPillIdsCount(); + /** + * repeated string pill_ids = 2; + */ + String getPillIds(int index); + /** + * repeated string pill_ids = 2; + */ + com.google.protobuf.ByteString + getPillIdsBytes(int index); + + // repeated string sense_ids = 3; + /** + * repeated string sense_ids = 3; + */ + java.util.List + getSenseIdsList(); + /** + * repeated string sense_ids = 3; + */ + int getSenseIdsCount(); + /** + * repeated string sense_ids = 3; + */ + String getSenseIds(int index); + /** + * repeated string sense_ids = 3; + */ + com.google.protobuf.ByteString + getSenseIdsBytes(int index); + } + /** + * Protobuf type {@code WorkerTask} + */ + public static final class WorkerTask extends + com.google.protobuf.GeneratedMessage + implements WorkerTaskOrBuilder { + // Use WorkerTask.newBuilder() to construct. + private WorkerTask(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private WorkerTask(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final WorkerTask defaultInstance; + public static WorkerTask getDefaultInstance() { + return defaultInstance; + } + + public WorkerTask getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private WorkerTask( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + TaskType value = TaskType.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + taskType_ = value; + } + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + pillIds_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000002; + } + pillIds_.add(input.readBytes()); + break; + } + case 26: { + if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + senseIds_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000004; + } + senseIds_.add(input.readBytes()); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + pillIds_ = new com.google.protobuf.UnmodifiableLazyStringList(pillIds_); + } + if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + senseIds_ = new com.google.protobuf.UnmodifiableLazyStringList(senseIds_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return TaskProtos.internal_static_WorkerTask_descriptor; + } + + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return TaskProtos.internal_static_WorkerTask_fieldAccessorTable + .ensureFieldAccessorsInitialized( + WorkerTask.class, Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public WorkerTask parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new WorkerTask(input, extensionRegistry); + } + }; + + @Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + /** + * Protobuf enum {@code WorkerTask.TaskType} + */ + public enum TaskType + implements com.google.protobuf.ProtocolMessageEnum { + /** + * UNKNOWN = 0; + */ + UNKNOWN(0, 0), + /** + * PILL_CLASSIFICATION = 1; + */ + PILL_CLASSIFICATION(1, 1), + ; + + /** + * UNKNOWN = 0; + */ + public static final int UNKNOWN_VALUE = 0; + /** + * PILL_CLASSIFICATION = 1; + */ + public static final int PILL_CLASSIFICATION_VALUE = 1; + + + public final int getNumber() { return value; } + + public static TaskType valueOf(int value) { + switch (value) { + case 0: return UNKNOWN; + case 1: return PILL_CLASSIFICATION; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public TaskType findValueByNumber(int number) { + return TaskType.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return WorkerTask.getDescriptor().getEnumTypes().get(0); + } + + private static final TaskType[] VALUES = values(); + + public static TaskType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private TaskType(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:WorkerTask.TaskType) + } + + private int bitField0_; + // optional .WorkerTask.TaskType task_type = 1; + public static final int TASK_TYPE_FIELD_NUMBER = 1; + private TaskType taskType_; + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public boolean hasTaskType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public TaskType getTaskType() { + return taskType_; + } + + // repeated string pill_ids = 2; + public static final int PILL_IDS_FIELD_NUMBER = 2; + private com.google.protobuf.LazyStringList pillIds_; + /** + * repeated string pill_ids = 2; + */ + public java.util.List + getPillIdsList() { + return pillIds_; + } + /** + * repeated string pill_ids = 2; + */ + public int getPillIdsCount() { + return pillIds_.size(); + } + /** + * repeated string pill_ids = 2; + */ + public String getPillIds(int index) { + return pillIds_.get(index); + } + /** + * repeated string pill_ids = 2; + */ + public com.google.protobuf.ByteString + getPillIdsBytes(int index) { + return pillIds_.getByteString(index); + } + + // repeated string sense_ids = 3; + public static final int SENSE_IDS_FIELD_NUMBER = 3; + private com.google.protobuf.LazyStringList senseIds_; + /** + * repeated string sense_ids = 3; + */ + public java.util.List + getSenseIdsList() { + return senseIds_; + } + /** + * repeated string sense_ids = 3; + */ + public int getSenseIdsCount() { + return senseIds_.size(); + } + /** + * repeated string sense_ids = 3; + */ + public String getSenseIds(int index) { + return senseIds_.get(index); + } + /** + * repeated string sense_ids = 3; + */ + public com.google.protobuf.ByteString + getSenseIdsBytes(int index) { + return senseIds_.getByteString(index); + } + + private void initFields() { + taskType_ = TaskType.UNKNOWN; + pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, taskType_.getNumber()); + } + for (int i = 0; i < pillIds_.size(); i++) { + output.writeBytes(2, pillIds_.getByteString(i)); + } + for (int i = 0; i < senseIds_.size(); i++) { + output.writeBytes(3, senseIds_.getByteString(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, taskType_.getNumber()); + } + { + int dataSize = 0; + for (int i = 0; i < pillIds_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(pillIds_.getByteString(i)); + } + size += dataSize; + size += 1 * getPillIdsList().size(); + } + { + int dataSize = 0; + for (int i = 0; i < senseIds_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeBytesSizeNoTag(senseIds_.getByteString(i)); + } + size += dataSize; + size += 1 * getSenseIdsList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @Override + protected Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static WorkerTask parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static WorkerTask parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static WorkerTask parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static WorkerTask parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static WorkerTask parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static WorkerTask parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static WorkerTask parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static WorkerTask parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static WorkerTask parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static WorkerTask parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(WorkerTask prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @Override + protected Builder newBuilderForType( + BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code WorkerTask} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements WorkerTaskOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return TaskProtos.internal_static_WorkerTask_descriptor; + } + + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return TaskProtos.internal_static_WorkerTask_fieldAccessorTable + .ensureFieldAccessorsInitialized( + WorkerTask.class, Builder.class); + } + + // Construct using com.hello.suripu.api.tasks.TaskProtos.WorkerTask.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + taskType_ = TaskType.UNKNOWN; + bitField0_ = (bitField0_ & ~0x00000001); + pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return TaskProtos.internal_static_WorkerTask_descriptor; + } + + public WorkerTask getDefaultInstanceForType() { + return WorkerTask.getDefaultInstance(); + } + + public WorkerTask build() { + WorkerTask result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public WorkerTask buildPartial() { + WorkerTask result = new WorkerTask(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.taskType_ = taskType_; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + pillIds_ = new com.google.protobuf.UnmodifiableLazyStringList( + pillIds_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.pillIds_ = pillIds_; + if (((bitField0_ & 0x00000004) == 0x00000004)) { + senseIds_ = new com.google.protobuf.UnmodifiableLazyStringList( + senseIds_); + bitField0_ = (bitField0_ & ~0x00000004); + } + result.senseIds_ = senseIds_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof WorkerTask) { + return mergeFrom((WorkerTask)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(WorkerTask other) { + if (other == WorkerTask.getDefaultInstance()) return this; + if (other.hasTaskType()) { + setTaskType(other.getTaskType()); + } + if (!other.pillIds_.isEmpty()) { + if (pillIds_.isEmpty()) { + pillIds_ = other.pillIds_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensurePillIdsIsMutable(); + pillIds_.addAll(other.pillIds_); + } + onChanged(); + } + if (!other.senseIds_.isEmpty()) { + if (senseIds_.isEmpty()) { + senseIds_ = other.senseIds_; + bitField0_ = (bitField0_ & ~0x00000004); + } else { + ensureSenseIdsIsMutable(); + senseIds_.addAll(other.senseIds_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + WorkerTask parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (WorkerTask) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional .WorkerTask.TaskType task_type = 1; + private TaskType taskType_ = TaskType.UNKNOWN; + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public boolean hasTaskType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public TaskType getTaskType() { + return taskType_; + } + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public Builder setTaskType(TaskType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + taskType_ = value; + onChanged(); + return this; + } + /** + * optional .WorkerTask.TaskType task_type = 1; + */ + public Builder clearTaskType() { + bitField0_ = (bitField0_ & ~0x00000001); + taskType_ = TaskType.UNKNOWN; + onChanged(); + return this; + } + + // repeated string pill_ids = 2; + private com.google.protobuf.LazyStringList pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensurePillIdsIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + pillIds_ = new com.google.protobuf.LazyStringArrayList(pillIds_); + bitField0_ |= 0x00000002; + } + } + /** + * repeated string pill_ids = 2; + */ + public java.util.List + getPillIdsList() { + return java.util.Collections.unmodifiableList(pillIds_); + } + /** + * repeated string pill_ids = 2; + */ + public int getPillIdsCount() { + return pillIds_.size(); + } + /** + * repeated string pill_ids = 2; + */ + public String getPillIds(int index) { + return pillIds_.get(index); + } + /** + * repeated string pill_ids = 2; + */ + public com.google.protobuf.ByteString + getPillIdsBytes(int index) { + return pillIds_.getByteString(index); + } + /** + * repeated string pill_ids = 2; + */ + public Builder setPillIds( + int index, String value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePillIdsIsMutable(); + pillIds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string pill_ids = 2; + */ + public Builder addPillIds( + String value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePillIdsIsMutable(); + pillIds_.add(value); + onChanged(); + return this; + } + /** + * repeated string pill_ids = 2; + */ + public Builder addAllPillIds( + Iterable values) { + ensurePillIdsIsMutable(); + super.addAll(values, pillIds_); + onChanged(); + return this; + } + /** + * repeated string pill_ids = 2; + */ + public Builder clearPillIds() { + pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + return this; + } + /** + * repeated string pill_ids = 2; + */ + public Builder addPillIdsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePillIdsIsMutable(); + pillIds_.add(value); + onChanged(); + return this; + } + + // repeated string sense_ids = 3; + private com.google.protobuf.LazyStringList senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + private void ensureSenseIdsIsMutable() { + if (!((bitField0_ & 0x00000004) == 0x00000004)) { + senseIds_ = new com.google.protobuf.LazyStringArrayList(senseIds_); + bitField0_ |= 0x00000004; + } + } + /** + * repeated string sense_ids = 3; + */ + public java.util.List + getSenseIdsList() { + return java.util.Collections.unmodifiableList(senseIds_); + } + /** + * repeated string sense_ids = 3; + */ + public int getSenseIdsCount() { + return senseIds_.size(); + } + /** + * repeated string sense_ids = 3; + */ + public String getSenseIds(int index) { + return senseIds_.get(index); + } + /** + * repeated string sense_ids = 3; + */ + public com.google.protobuf.ByteString + getSenseIdsBytes(int index) { + return senseIds_.getByteString(index); + } + /** + * repeated string sense_ids = 3; + */ + public Builder setSenseIds( + int index, String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureSenseIdsIsMutable(); + senseIds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string sense_ids = 3; + */ + public Builder addSenseIds( + String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureSenseIdsIsMutable(); + senseIds_.add(value); + onChanged(); + return this; + } + /** + * repeated string sense_ids = 3; + */ + public Builder addAllSenseIds( + Iterable values) { + ensureSenseIdsIsMutable(); + super.addAll(values, senseIds_); + onChanged(); + return this; + } + /** + * repeated string sense_ids = 3; + */ + public Builder clearSenseIds() { + senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); + onChanged(); + return this; + } + /** + * repeated string sense_ids = 3; + */ + public Builder addSenseIdsBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureSenseIdsIsMutable(); + senseIds_.add(value); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:WorkerTask) + } + + static { + defaultInstance = new WorkerTask(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:WorkerTask) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_WorkerTask_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_WorkerTask_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + String[] descriptorData = { + "\n\021worker_task.proto\"\214\001\n\nWorkerTask\022\'\n\tta" + + "sk_type\030\001 \001(\0162\024.WorkerTask.TaskType\022\020\n\010p" + + "ill_ids\030\002 \003(\t\022\021\n\tsense_ids\030\003 \003(\t\"0\n\010Task" + + "Type\022\013\n\007UNKNOWN\020\000\022\027\n\023PILL_CLASSIFICATION" + + "\020\001B(\n\032com.hello.suripu.api.tasksB\nTaskPr" + + "otos" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_WorkerTask_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_WorkerTask_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_WorkerTask_descriptor, + new String[] { "TaskType", "PillIds", "SenseIds", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java b/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java index 1a78158d1..c5a069cc0 100644 --- a/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java +++ b/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java @@ -10,7 +10,8 @@ public enum QueueName { ENCODE_AUDIO("encode_audio"), BATCH_PILL_DATA ("batch_pill_data"), SENSE_SENSORS_DATA("sense_sensors_data"), - LOGS("logs"); + LOGS("logs"), + WORKER_TASKS("worker_tasks"); private String value; diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java b/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java new file mode 100644 index 000000000..a2764a734 --- /dev/null +++ b/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java @@ -0,0 +1,34 @@ +package com.hello.suripu.core.db; + +import com.google.common.base.Optional; +import com.hello.suripu.core.db.binders.BindPillClassification; +import com.hello.suripu.core.db.mappers.PillClassificationMapper; +import com.hello.suripu.core.models.PillClassification; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper; +import org.skife.jdbi.v2.sqlobject.customizers.SingleValueResult; + +/** + * Created by pangwu on 6/23/15. + */ +public interface PillClassificationDAO { + @RegisterMapper(PillClassificationMapper.class) + @SingleValueResult(PillClassification.class) + @SqlQuery("SELECT * FROM pill_classification WHERE internal_pill_id = :internal_pill_id;") + Optional getByInternalPillId(@Bind("internal_pill_id") final long internalPillId); + + @SqlUpdate("UPDATE pill_classification SET max_24hr_diff = 0, max_72hr_diff = 0 WHERE internal_pill_id = :internal_pill_id;") + Integer resetByInternalPillId(@Bind("internal_pill_id") final long internalPillId); + + @SqlUpdate("INSERT INTO pill_classification (" + + "internal_pill_id,pill_id,last_24pt_window_ts,last_72pt_window_ts,last_update_batt,max_24hr_diff,max_72hr_diff,class) " + + "VALUES (:internal_pill_id, :pill_id, :last_24pt_window_ts, :last_72pt_window_ts, :last_update_batt, :max_24hr_diff, :max_72hr_diff, :class);") + Integer insert(@BindPillClassification final PillClassification classification); + + @SqlUpdate("UPDATE pill_classification SET max_24hr_diff = :max_24hr_diff, max_72hr_diff = :max_72hr_diff, " + + "last_24pt_window_ts = :last_24pt_window_ts, last_72pt_window_ts = :last_72pt_window_ts, " + + "last_update_batt = :last_update_batt, class = :class") + Integer update(@BindPillClassification final PillClassification classification); +} diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java b/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java index f6b2b6a79..054cc8865 100644 --- a/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java +++ b/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java @@ -13,6 +13,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public abstract class PillHeartBeatDAO { private final static Logger LOGGER = LoggerFactory.getLogger(PillHeartBeatDAO.class); @@ -26,6 +28,13 @@ public abstract class PillHeartBeatDAO { "FROM pill_status WHERE pill_id = :pill_id and last_updated > now() - interval '24 hours' ORDER BY last_updated DESC LIMIT 1") public abstract Optional getPillStatus(@Bind("pill_id") final Long pillId); + @RegisterMapper(DeviceStatusMapper.class) + @SqlQuery("SELECT * FROM pill_status WHERE pill_id = :pill_id AND last_updated > :from AND last_updated <= :to ORDER BY last_updated ASC") + public abstract List getPillStatusBetweenUTC(@Bind("pill_id") final Long pillId, @Bind("from") final DateTime from, @Bind("to") final DateTime to); + + @SqlQuery("SELECT DISTINCT ON (pill_id) pill_id FROM pill_status WHERE last_updated > now() - interval '30 hours' AND battery_level < 80;") + public abstract List getPillIdsSeenInLast24Hours(); + public void silentInsert(final Long internalPillId, final Integer batteryLevel, final Integer uptime, final Integer firmwareVersion, final DateTime lastUpdated) { try { LOGGER.debug("last updated: {}", lastUpdated); diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java b/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java new file mode 100644 index 000000000..6070457eb --- /dev/null +++ b/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java @@ -0,0 +1,39 @@ +package com.hello.suripu.core.db.binders; + +import com.hello.suripu.core.models.PillClassification; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.sqlobject.Binder; +import org.skife.jdbi.v2.sqlobject.BinderFactory; +import org.skife.jdbi.v2.sqlobject.BindingAnnotation; + +import java.lang.annotation.Annotation; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Created by pangwu on 6/23/15. + */ +@BindingAnnotation(BindPillClassification.BindPillClassificationFactory.class) +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PARAMETER}) +public @interface BindPillClassification { + public static class BindPillClassificationFactory implements BinderFactory { + public Binder build(final Annotation annotation) { + return new Binder() { + public void bind(final SQLStatement q, final BindPillClassification bind, final PillClassification arg) { + q.bind("id", arg.id); + q.bind("internal_pill_id", arg.internalPillId); + q.bind("pill_id", arg.pillId); + q.bind("last_24pt_window_ts", arg.last24PointWindowStartTime); + q.bind("last_72pt_window_ts", arg.last72PointWindowStartTime); + q.bind("last_update_batt", PillClassification.floatToInt(arg.lastClassificationBatteryLevel)); + q.bind("max_24hr_diff", PillClassification.floatToInt(arg.max24HoursBatteryDelta)); + q.bind("max_72hr_diff", PillClassification.floatToInt(arg.max72HoursBatteryDelta)); + q.bind("class", arg.status.toInt()); + } + }; + } + } +} diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java b/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java new file mode 100644 index 000000000..99d881cd7 --- /dev/null +++ b/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java @@ -0,0 +1,33 @@ +package com.hello.suripu.core.db.mappers; + +import com.hello.suripu.core.models.PillClassification; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Created by pangwu on 6/23/15. + */ +public class PillClassificationMapper implements ResultSetMapper { + @Override + public PillClassification map(int index, final ResultSet r, final StatementContext ctx) throws SQLException { + final PillClassification classification = new PillClassification( + r.getLong("id"), + r.getLong("internal_pill_id"), + r.getString("pill_id"), + + r.getTimestamp("last_24pt_window_ts").getTime(), + r.getTimestamp("last_72pt_window_ts").getTime(), + + PillClassification.intToFloat(r.getInt("last_update_batt")), + + PillClassification.intToFloat(r.getInt("max_24hr_diff")), + PillClassification.intToFloat(r.getInt("max_72hr_diff")), + + r.getInt("class") + ); + return classification; + } +} diff --git a/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java new file mode 100644 index 000000000..33b07c0a6 --- /dev/null +++ b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java @@ -0,0 +1,68 @@ +package com.hello.suripu.core.models; + +import com.google.common.base.Optional; +import com.hello.suripu.core.db.PillHeartBeatDAO; +import com.hello.suripu.core.util.PillStatus; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.util.List; + +/** + * Created by pangwu on 6/23/15. + */ +public class PillClassification { + public final long id; + public final long internalPillId; + public final String pillId; + public final DateTime last24PointWindowStartTime; + public final DateTime last72PointWindowStartTime; + public final float lastClassificationBatteryLevel; + public final float max24HoursBatteryDelta; + public final float max72HoursBatteryDelta; + + public final PillStatus status; + + public PillClassification(final long id, + final long internalPillId, + final String pillId, + final long last24PointWindowStartMillis, + final long last72PointWindowStartMillis, + final float lastClassificationBatteryLevel, + final float max24HoursBatteryDelta, + final float max72HoursBatteryDelta, + final int classificationResult){ + this.id = id; + this.internalPillId = internalPillId; + this.pillId = pillId; + this.last24PointWindowStartTime = new DateTime(last24PointWindowStartMillis, DateTimeZone.UTC); + this.last72PointWindowStartTime = new DateTime(last72PointWindowStartMillis, DateTimeZone.UTC); + + this.lastClassificationBatteryLevel = lastClassificationBatteryLevel; + this.status = PillStatus.fromInt(classificationResult); + + this.max24HoursBatteryDelta = max24HoursBatteryDelta; + this.max72HoursBatteryDelta = max72HoursBatteryDelta; + + } + + public static int floatToInt(final float value){ + return (int)(value * 100); + } + + public static float intToFloat(final int value){ + return value / 100f; + } + + public static DateTime getQueryStartTime(final Optional lastRecord){ + if(!lastRecord.isPresent()){ + return new DateTime(0, DateTimeZone.UTC); + } + + return lastRecord.get().last24PointWindowStartTime; + } + + public static List restoreBuffer(final long internalPillId, final DateTime queryStartTime, final DateTime queryEndTime, final PillHeartBeatDAO pillHeartBeatDAO){ + return pillHeartBeatDAO.getPillStatusBetweenUTC(internalPillId, queryStartTime, queryEndTime); + } +} diff --git a/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java b/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java new file mode 100644 index 000000000..22c62c550 --- /dev/null +++ b/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java @@ -0,0 +1,32 @@ +package com.hello.suripu.core.util; + +/** + * Created by pangwu on 6/23/15. + */ +public enum PillStatus { + UNKNOWN, + BAD, + NORMAL; + + public static PillStatus fromInt(final int value) { + switch (value){ + case 1: + return BAD; + case 0: + return NORMAL; + default: + return UNKNOWN; + } + } + + public int toInt(){ + switch (this){ + case BAD: + return 1; + case NORMAL: + return 0; + default: + return -1; + } + } +} diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example index d10f05d90..0de708369 100644 --- a/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example @@ -67,7 +67,7 @@ common_db: kinesis: endpoint : https://kinesis.us-east-1.amazonaws.com streams : - batch_pill_data : batch_pill_data + worker_tasks : prod_worker_tasks app_name: PillStatusWorkerDev diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml index c48c23be9..24b9ff635 100644 --- a/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml @@ -77,7 +77,7 @@ common_db: kinesis: endpoint : https://kinesis.us-east-1.amazonaws.com streams : - batch_pill_data : batch_pill_data + worker_tasks : prod_worker_tasks app_name: PillStatusWorkerProd diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml index 105a09a54..d97302940 100644 --- a/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml +++ b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml @@ -78,7 +78,7 @@ common_db: kinesis: endpoint : https://kinesis.us-east-1.amazonaws.com streams : - batch_pill_data : dev_batch_pill_data + worker_tasks : dev_worker_tasks app_name: PillStatusWorker diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java new file mode 100644 index 000000000..f80d379bf --- /dev/null +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java @@ -0,0 +1,285 @@ +package com.hello.suripu.workers.pillstatus; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.model.Record; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; +import com.google.protobuf.InvalidProtocolBufferException; +import com.hello.suripu.api.tasks.TaskProtos; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.PillClassificationDAO; +import com.hello.suripu.core.db.PillHeartBeatDAO; +import com.hello.suripu.core.models.DeviceAccountPair; +import com.hello.suripu.core.models.DeviceStatus; +import com.hello.suripu.core.models.PillClassification; +import com.hello.suripu.core.util.PillStatus; +import com.hello.suripu.workers.framework.HelloBaseRecordProcessor; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Meter; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by pangwu on 6/23/15. + */ +public class PillStatusRecordProcessor extends HelloBaseRecordProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(PillStatusRecordProcessor.class); + private static final int PILL_BATT_LEVEL_FLUCTUATE_RANGE = 20; + + private final PillHeartBeatDAO pillHeartBeatDAO; + private final PillClassificationDAO pillClassificationDAO; + private final DeviceDAO deviceDAO; + + private final Meter notRegisteredPillMeter; + private final Meter batteryRecoverMeter; + private final Meter dyingPillMeter; + private final Meter newConvertedDyingPill; + + public PillStatusRecordProcessor(final DeviceDAO deviceDAO, + final PillHeartBeatDAO pillHeartBeatDAO, + final PillClassificationDAO pillClassificationDAO){ + this.pillHeartBeatDAO = pillHeartBeatDAO; + this.deviceDAO = deviceDAO; + this.pillClassificationDAO = pillClassificationDAO; + + this.notRegisteredPillMeter = Metrics.defaultRegistry().newMeter( + PillStatusRecordProcessor.class, + "pill-not-registered", + "pill-not-registered", + TimeUnit.SECONDS); + this.batteryRecoverMeter = Metrics.defaultRegistry().newMeter( + PillStatusRecordProcessor.class, + "pill-recover", + "pill-recover", + TimeUnit.SECONDS + ); + this.dyingPillMeter = Metrics.defaultRegistry().newMeter( + PillStatusRecordProcessor.class, + "dying-pill-count", + "dying-pill-count", + TimeUnit.SECONDS + ); + this.newConvertedDyingPill = Metrics.defaultRegistry().newMeter( + PillStatusRecordProcessor.class, + "detected-dying-pill", + "detected-dying-pill", + TimeUnit.SECONDS + ); + } + + @Override + public void initialize(final String s) { + LOGGER.info("PillStatusRecordProcessor initialized: {}", s); + } + + + private List removeDuplicate(final List deviceStatuses){ + final List copy = new ArrayList<>(); + + + DateTime lastHeartBeatTime = null; + for(final DeviceStatus deviceStatus:deviceStatuses){ + if((lastHeartBeatTime == null) || + (lastHeartBeatTime != null && !deviceStatus.lastSeen.equals(lastHeartBeatTime))){ + copy.add(deviceStatus); + } + + lastHeartBeatTime = deviceStatus.lastSeen; + } + + return copy; + } + + + private float getDrop(final List window, final int windowSize, final int smoothCount){ + if(window.size() < windowSize || window.size() < smoothCount){ + return 0f; + } + + long startSum = 0; + long endSum = 0; + for(int i = 0; i < smoothCount; i++){ + startSum += window.get(i).batteryLevel; + endSum += window.get(window.size() - i - 1).batteryLevel; + } + + final float startBatteryLevel = startSum / (float)smoothCount; + final float endBatteryLevel = endSum / (float)smoothCount; + + final float drop = startBatteryLevel - endBatteryLevel; + return drop; + } + + + private Optional getMaxDrop(final List restoredWindow, final DateTime windowStartTime, final int windowSize){ + + final List restoredWindowMutable = Lists.newArrayList(restoredWindow); + Collections.sort(restoredWindowMutable, new Comparator() { + @Override + public int compare(final DeviceStatus o1, final DeviceStatus o2) { + return o1.lastSeen.compareTo(o2.lastSeen); + } + }); + + final List bufferNoDuplication = removeDuplicate(restoredWindowMutable); + final LinkedList slidingWindow = new LinkedList<>(); + Optional maxDrop = Optional.absent(); + + for(final DeviceStatus deviceStatus:bufferNoDuplication){ + if(deviceStatus.batteryLevel > 100){ + // skip the level interrupt checker + continue; + } + + if(deviceStatus.lastSeen.isBefore(windowStartTime)){ + continue; + } + + slidingWindow.add(deviceStatus); + if(slidingWindow.size() == windowSize){ + + final float drop = getDrop(slidingWindow, windowSize, 5); + if(maxDrop.isPresent() == false || drop > maxDrop.get()){ + maxDrop = Optional.of(drop); + } + slidingWindow.removeFirst(); + } + } + + return maxDrop; + + } + + private void doProcess(final List internalPillIds){ + final RateLimiter rateLimiter = RateLimiter.create(100d); + for(final Long internalPillId:internalPillIds){ + rateLimiter.acquire(); + + final Optional pillClassificationOptional = this.pillClassificationDAO.getByInternalPillId(internalPillId); + + // Figure out where we ends the classification so we can reconstruct the sliding window. + final DateTime queryStartTimeFor24hrWindow = pillClassificationOptional.isPresent() ? + pillClassificationOptional.get().last24PointWindowStartTime : + new DateTime(0, DateTimeZone.UTC); + final DateTime queryStartTimeFor72hrWindow = pillClassificationOptional.isPresent() ? + pillClassificationOptional.get().last72PointWindowStartTime : + new DateTime(0, DateTimeZone.UTC); + final List dataFromLast72Hours = this.pillHeartBeatDAO.getPillStatusBetweenUTC(internalPillId, + queryStartTimeFor72hrWindow, + DateTime.now()); + + if(dataFromLast72Hours.isEmpty()){ + continue; + } + + // Extract the two features we need: + // day to day drop + // and drop on every 3 days. + final Optional maxDrop24hr = getMaxDrop(dataFromLast72Hours, + queryStartTimeFor24hrWindow, + 24); + final Optional maxDrop72hr = getMaxDrop(dataFromLast72Hours, + queryStartTimeFor72hrWindow, + 72); + + // Plug it into the decision boundary: + // -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0 + + if(!maxDrop24hr.isPresent() || !maxDrop72hr.isPresent()){ + LOGGER.debug("Not enough data to classify pill {}", internalPillId); + continue; + } + + final DeviceStatus lastHeartBeat = dataFromLast72Hours.get(dataFromLast72Hours.size() - 1); + final long lastHeartBeatTimeMillis = lastHeartBeat.lastSeen.getMillis(); + final PillClassification classification = new PillClassification(0L, internalPillId, + "", // we should think a way to get this. + lastHeartBeatTimeMillis, + lastHeartBeatTimeMillis, + lastHeartBeat.batteryLevel, + Math.max(maxDrop24hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max24HoursBatteryDelta : 0f), + Math.max(maxDrop72hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max72HoursBatteryDelta : 0f), + PillStatus.NORMAL.toInt() + ); + + + + if(!pillClassificationOptional.isPresent()){ + this.pillClassificationDAO.insert(classification); + }else{ + this.pillClassificationDAO.update(classification); + } + this.newConvertedDyingPill.mark(); + + } + } + + @Override + public void processRecords(final List list, final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { + for(final Record record:list){ + try { + final TaskProtos.WorkerTask workerTask = TaskProtos.WorkerTask.parseFrom(record.getData().array()); + if(workerTask.getTaskType() != TaskProtos.WorkerTask.TaskType.PILL_CLASSIFICATION){ + continue; + } + + final List internalPillIdsToBeClassified = new ArrayList<>(); + final List pillIds = workerTask.getPillIdsList(); + + for(final String pillId:pillIds){ + final Optional registeredPill = this.deviceDAO.getInternalPillId(pillId); + if(!registeredPill.isPresent()){ + LOGGER.info("Pill {} not found in tracker account map, pill might be unpaired."); + continue; + } + + internalPillIdsToBeClassified.add(registeredPill.get().internalDeviceId); + } + + + if(workerTask.getPillIdsCount() == 0){ + final List pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery = this.pillHeartBeatDAO.getPillIdsSeenInLast24Hours(); + LOGGER.info("Found {} pills to be classified.", pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery.size()); + + internalPillIdsToBeClassified.addAll(pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery); + } + + doProcess(internalPillIdsToBeClassified); + + } catch (InvalidProtocolBufferException e) { + LOGGER.error("Failed to decode protobuf: {}", e.getMessage()); + } + } + + + + try { + iRecordProcessorCheckpointer.checkpoint(); + } catch (InvalidStateException e) { + LOGGER.error("checkpoint {}", e.getMessage()); + } catch (ShutdownException e) { + LOGGER.error("Received shutdown command at checkpoint, bailing. {}", e.getMessage()); + System.exit(1); + } + } + + @Override + public void shutdown(final IRecordProcessorCheckpointer iRecordProcessorCheckpointer, final ShutdownReason shutdownReason) { + LOGGER.warn("SHUTDOWN: {}", shutdownReason.toString()); + System.exit(1); + } +} diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java new file mode 100644 index 000000000..b6303f50a --- /dev/null +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java @@ -0,0 +1,33 @@ +package com.hello.suripu.workers.pillstatus; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.PillClassificationDAO; +import com.hello.suripu.core.db.PillHeartBeatDAO; + +/** + * Created by pangwu on 6/23/15. + */ +public class PillStatusRecordProcessorFactory implements IRecordProcessorFactory { + private final PillHeartBeatDAO pillHeartBeatDAO; + private final PillClassificationDAO pillClassificationDAO; + private final DeviceDAO deviceDAO; + + + public PillStatusRecordProcessorFactory(final DeviceDAO deviceDAO, + final PillClassificationDAO pillClassificationDAO, + final PillHeartBeatDAO pillHeartBeatDAO){ + this.pillHeartBeatDAO = pillHeartBeatDAO; + this.pillClassificationDAO = pillClassificationDAO; + this.deviceDAO = deviceDAO; + } + + @Override + public IRecordProcessor createProcessor() { + final IRecordProcessor recordProcessor = new PillStatusRecordProcessor(this.deviceDAO, + this.pillHeartBeatDAO, + this.pillClassificationDAO); + return recordProcessor; + } +} diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java new file mode 100644 index 000000000..3f0d64ffe --- /dev/null +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java @@ -0,0 +1,90 @@ +package com.hello.suripu.workers.pillstatus; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.google.common.collect.ImmutableMap; +import com.hello.suripu.core.configuration.QueueName; +import com.hello.suripu.core.db.DeviceDAO; +import com.hello.suripu.core.db.PillClassificationDAO; +import com.hello.suripu.core.db.PillHeartBeatDAO; +import com.hello.suripu.core.db.util.JodaArgumentFactory; +import com.hello.suripu.core.db.util.PostgresIntegerArrayArgumentFactory; +import com.hello.suripu.workers.framework.WorkerEnvironmentCommand; +import com.yammer.dropwizard.config.Environment; +import com.yammer.dropwizard.db.ManagedDataSourceFactory; +import com.yammer.dropwizard.jdbi.ImmutableListContainerFactory; +import com.yammer.dropwizard.jdbi.ImmutableSetContainerFactory; +import com.yammer.dropwizard.jdbi.OptionalContainerFactory; +import net.sourceforge.argparse4j.inf.Namespace; +import org.skife.jdbi.v2.DBI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Created by pangwu on 6/23/15. + */ +public class PillStatusWorkerCommand extends WorkerEnvironmentCommand { + + private final static Logger LOGGER = LoggerFactory.getLogger(PillStatusWorkerCommand.class); + + public PillStatusWorkerCommand(){ + super("pillstatus_worker", "Worker automatically classify quick discharging pills"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, final PillStatusWorkerConfiguration configuration) throws Exception { + final ManagedDataSourceFactory managedDataSourceFactory = new ManagedDataSourceFactory(); + + final DBI commonDB = new DBI(managedDataSourceFactory.build(configuration.getCommonDB())); + final DBI sensorsDB = new DBI(managedDataSourceFactory.build(configuration.getSensorsDB())); + + sensorsDB.registerArgumentFactory(new JodaArgumentFactory()); + sensorsDB.registerContainerFactory(new OptionalContainerFactory()); + sensorsDB.registerArgumentFactory(new PostgresIntegerArrayArgumentFactory()); + sensorsDB.registerContainerFactory(new ImmutableListContainerFactory()); + sensorsDB.registerContainerFactory(new ImmutableSetContainerFactory()); + + + commonDB.registerArgumentFactory(new JodaArgumentFactory()); + commonDB.registerContainerFactory(new OptionalContainerFactory()); + commonDB.registerArgumentFactory(new PostgresIntegerArrayArgumentFactory()); + commonDB.registerContainerFactory(new ImmutableListContainerFactory()); + commonDB.registerContainerFactory(new ImmutableSetContainerFactory()); + + final DeviceDAO deviceDAO = commonDB.onDemand(DeviceDAO.class); + final PillHeartBeatDAO pillHeartBeatDAO = sensorsDB.onDemand(PillHeartBeatDAO.class); + final PillClassificationDAO pillClassificationDAO = sensorsDB.onDemand(PillClassificationDAO.class); + + final ImmutableMap queueNames = configuration.getQueues(); + + LOGGER.debug("{}", queueNames); + final String queueName = queueNames.get(QueueName.WORKER_TASKS); + LOGGER.info("\n\n\n!!! This worker is using the following queue: {} !!!\n\n\n", queueName); + + final AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain(); + final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); + final KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration( + configuration.getAppName(), + queueName, + awsCredentialsProvider, + workerId); + kinesisConfig.withMaxRecords(configuration.getMaxRecords()); + kinesisConfig.withKinesisEndpoint(configuration.getKinesisEndpoint()); + kinesisConfig.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); + + + final IRecordProcessorFactory factory = new PillStatusRecordProcessorFactory( + deviceDAO, + pillClassificationDAO, + pillHeartBeatDAO); + final Worker worker = new Worker(factory, kinesisConfig); + worker.run(); + } +} diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java index d02ead123..86874d224 100644 --- a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java @@ -6,6 +6,7 @@ import com.yammer.dropwizard.db.DatabaseConfiguration; import javax.validation.Valid; +import javax.validation.constraints.Max; import javax.validation.constraints.NotNull; /** @@ -31,6 +32,18 @@ public DatabaseConfiguration getSensorsDB() { return sensorsDB; } + + @Valid + @NotNull + @Max(1000) + @JsonProperty("max_records") + private Integer maxRecords; + + public Integer getMaxRecords() { + return maxRecords; + } + + @Valid @NotNull @JsonProperty("common_db") From 6a6762b34815d2581bcd68361c352f7445eb7a9a Mon Sep 17 00:00:00 2001 From: Pang Wu Date: Tue, 7 Jul 2015 13:34:58 -0700 Subject: [PATCH 3/3] Plug in the classification --- .../core/models/PillClassification.java | 19 +++++++++++++++++++ .../pillstatus/PillStatusRecordProcessor.java | 13 +++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java index 33b07c0a6..b3738f2a4 100644 --- a/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java +++ b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java @@ -65,4 +65,23 @@ public static DateTime getQueryStartTime(final Optional last public static List restoreBuffer(final long internalPillId, final DateTime queryStartTime, final DateTime queryEndTime, final PillHeartBeatDAO pillHeartBeatDAO){ return pillHeartBeatDAO.getPillStatusBetweenUTC(internalPillId, queryStartTime, queryEndTime); } + + /* + * Decision boundary from SVM + * -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0 + */ + public static PillStatus classify(final float[] features){ + if(features.length < 2){ + return PillStatus.UNKNOWN; + } + + final float dayToDayDrop = features[0]; + final float threeDayDrop = features[1]; + final float result = -0.0064f * dayToDayDrop + 0.1281f * threeDayDrop - 3.4344f; + if(result > 0){ + return PillStatus.NORMAL; + } + + return PillStatus.BAD; + } } diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java index f80d379bf..e30d6dbe4 100644 --- a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java +++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java @@ -164,7 +164,7 @@ public int compare(final DeviceStatus o1, final DeviceStatus o2) { } - private void doProcess(final List internalPillIds){ + public void classifyPills(final List internalPillIds){ final RateLimiter rateLimiter = RateLimiter.create(100d); for(final Long internalPillId:internalPillIds){ rateLimiter.acquire(); @@ -196,14 +196,15 @@ private void doProcess(final List internalPillIds){ queryStartTimeFor72hrWindow, 72); - // Plug it into the decision boundary: - // -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0 - if(!maxDrop24hr.isPresent() || !maxDrop72hr.isPresent()){ LOGGER.debug("Not enough data to classify pill {}", internalPillId); continue; } + // Plug it into the decision boundary: + // -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0 + final PillStatus status = PillClassification.classify(new float[]{maxDrop24hr.get(), maxDrop72hr.get()}); + final DeviceStatus lastHeartBeat = dataFromLast72Hours.get(dataFromLast72Hours.size() - 1); final long lastHeartBeatTimeMillis = lastHeartBeat.lastSeen.getMillis(); final PillClassification classification = new PillClassification(0L, internalPillId, @@ -213,7 +214,7 @@ private void doProcess(final List internalPillIds){ lastHeartBeat.batteryLevel, Math.max(maxDrop24hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max24HoursBatteryDelta : 0f), Math.max(maxDrop72hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max72HoursBatteryDelta : 0f), - PillStatus.NORMAL.toInt() + status.toInt() ); @@ -258,7 +259,7 @@ public void processRecords(final List list, final IRecordProcessorCheckp internalPillIdsToBeClassified.addAll(pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery); } - doProcess(internalPillIdsToBeClassified); + classifyPills(internalPillIdsToBeClassified); } catch (InvalidProtocolBufferException e) { LOGGER.error("Failed to decode protobuf: {}", e.getMessage());