diff --git a/sql/sensors/tracker_motion/create.sql b/sql/sensors/tracker_motion/create.sql
index 7084ca294..265dc46e6 100644
--- a/sql/sensors/tracker_motion/create.sql
+++ b/sql/sensors/tracker_motion/create.sql
@@ -86,3 +86,18 @@ GRANT ALL PRIVILEGES ON pill_status_id_seq TO ingress_user;
ALTER TABLE pill_status ADD COLUMN uptime BIGINT;
ALTER TABLE pill_status ADD COLUMN fw_version INTEGER;
+
+
+CREATE TABLE pill_classification (
+ id BIGSERIAL PRIMARY KEY,
+ internal_pill_id BIGINT,
+ pill_id VARCHAR(255),
+ last_24pt_window_ts TIMESTAMP,
+ last_72pt_window_ts TIMESTAMP,
+ last_update_batt INTEGER,
+ max_24hr_diff INTEGER,
+ max_72hr_diff INTEGER,
+ class INTEGER
+);
+CREATE INDEX pill_battery_monitor_id_status on pill_battery_monitor(internal_pill_id, last_update);
+CREATE UNIQUE INDEX pill_battery_monitor_id_unique on pill_battery_monitor(internal_pill_id);
\ No newline at end of file
diff --git a/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java b/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java
new file mode 100644
index 000000000..52de331fe
--- /dev/null
+++ b/suripu-api/src/main/java/com/hello/suripu/api/tasks/TaskProtos.java
@@ -0,0 +1,910 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: worker_task.proto
+
+package com.hello.suripu.api.tasks;
+
+public final class TaskProtos {
+ private TaskProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface WorkerTaskOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .WorkerTask.TaskType task_type = 1;
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ boolean hasTaskType();
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ WorkerTask.TaskType getTaskType();
+
+ // repeated string pill_ids = 2;
+ /**
+ * repeated string pill_ids = 2;
+ */
+ java.util.List
+ getPillIdsList();
+ /**
+ * repeated string pill_ids = 2;
+ */
+ int getPillIdsCount();
+ /**
+ * repeated string pill_ids = 2;
+ */
+ String getPillIds(int index);
+ /**
+ * repeated string pill_ids = 2;
+ */
+ com.google.protobuf.ByteString
+ getPillIdsBytes(int index);
+
+ // repeated string sense_ids = 3;
+ /**
+ * repeated string sense_ids = 3;
+ */
+ java.util.List
+ getSenseIdsList();
+ /**
+ * repeated string sense_ids = 3;
+ */
+ int getSenseIdsCount();
+ /**
+ * repeated string sense_ids = 3;
+ */
+ String getSenseIds(int index);
+ /**
+ * repeated string sense_ids = 3;
+ */
+ com.google.protobuf.ByteString
+ getSenseIdsBytes(int index);
+ }
+ /**
+ * Protobuf type {@code WorkerTask}
+ */
+ public static final class WorkerTask extends
+ com.google.protobuf.GeneratedMessage
+ implements WorkerTaskOrBuilder {
+ // Use WorkerTask.newBuilder() to construct.
+ private WorkerTask(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private WorkerTask(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final WorkerTask defaultInstance;
+ public static WorkerTask getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public WorkerTask getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private WorkerTask(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ TaskType value = TaskType.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ taskType_ = value;
+ }
+ break;
+ }
+ case 18: {
+ if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ pillIds_ = new com.google.protobuf.LazyStringArrayList();
+ mutable_bitField0_ |= 0x00000002;
+ }
+ pillIds_.add(input.readBytes());
+ break;
+ }
+ case 26: {
+ if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+ senseIds_ = new com.google.protobuf.LazyStringArrayList();
+ mutable_bitField0_ |= 0x00000004;
+ }
+ senseIds_.add(input.readBytes());
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ pillIds_ = new com.google.protobuf.UnmodifiableLazyStringList(pillIds_);
+ }
+ if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+ senseIds_ = new com.google.protobuf.UnmodifiableLazyStringList(senseIds_);
+ }
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return TaskProtos.internal_static_WorkerTask_descriptor;
+ }
+
+ protected FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return TaskProtos.internal_static_WorkerTask_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ WorkerTask.class, Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public WorkerTask parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new WorkerTask(input, extensionRegistry);
+ }
+ };
+
+ @Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ /**
+ * Protobuf enum {@code WorkerTask.TaskType}
+ */
+ public enum TaskType
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * UNKNOWN = 0;
+ */
+ UNKNOWN(0, 0),
+ /**
+ * PILL_CLASSIFICATION = 1;
+ */
+ PILL_CLASSIFICATION(1, 1),
+ ;
+
+ /**
+ * UNKNOWN = 0;
+ */
+ public static final int UNKNOWN_VALUE = 0;
+ /**
+ * PILL_CLASSIFICATION = 1;
+ */
+ public static final int PILL_CLASSIFICATION_VALUE = 1;
+
+
+ public final int getNumber() { return value; }
+
+ public static TaskType valueOf(int value) {
+ switch (value) {
+ case 0: return UNKNOWN;
+ case 1: return PILL_CLASSIFICATION;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public TaskType findValueByNumber(int number) {
+ return TaskType.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return WorkerTask.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final TaskType[] VALUES = values();
+
+ public static TaskType valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private TaskType(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:WorkerTask.TaskType)
+ }
+
+ private int bitField0_;
+ // optional .WorkerTask.TaskType task_type = 1;
+ public static final int TASK_TYPE_FIELD_NUMBER = 1;
+ private TaskType taskType_;
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public boolean hasTaskType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public TaskType getTaskType() {
+ return taskType_;
+ }
+
+ // repeated string pill_ids = 2;
+ public static final int PILL_IDS_FIELD_NUMBER = 2;
+ private com.google.protobuf.LazyStringList pillIds_;
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public java.util.List
+ getPillIdsList() {
+ return pillIds_;
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public int getPillIdsCount() {
+ return pillIds_.size();
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public String getPillIds(int index) {
+ return pillIds_.get(index);
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public com.google.protobuf.ByteString
+ getPillIdsBytes(int index) {
+ return pillIds_.getByteString(index);
+ }
+
+ // repeated string sense_ids = 3;
+ public static final int SENSE_IDS_FIELD_NUMBER = 3;
+ private com.google.protobuf.LazyStringList senseIds_;
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public java.util.List
+ getSenseIdsList() {
+ return senseIds_;
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public int getSenseIdsCount() {
+ return senseIds_.size();
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public String getSenseIds(int index) {
+ return senseIds_.get(index);
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public com.google.protobuf.ByteString
+ getSenseIdsBytes(int index) {
+ return senseIds_.getByteString(index);
+ }
+
+ private void initFields() {
+ taskType_ = TaskType.UNKNOWN;
+ pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, taskType_.getNumber());
+ }
+ for (int i = 0; i < pillIds_.size(); i++) {
+ output.writeBytes(2, pillIds_.getByteString(i));
+ }
+ for (int i = 0; i < senseIds_.size(); i++) {
+ output.writeBytes(3, senseIds_.getByteString(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, taskType_.getNumber());
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < pillIds_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(pillIds_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getPillIdsList().size();
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < senseIds_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(senseIds_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getSenseIdsList().size();
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @Override
+ protected Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static WorkerTask parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static WorkerTask parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static WorkerTask parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static WorkerTask parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static WorkerTask parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static WorkerTask parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static WorkerTask parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static WorkerTask parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static WorkerTask parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static WorkerTask parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(WorkerTask prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @Override
+ protected Builder newBuilderForType(
+ BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code WorkerTask}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements WorkerTaskOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return TaskProtos.internal_static_WorkerTask_descriptor;
+ }
+
+ protected FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return TaskProtos.internal_static_WorkerTask_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ WorkerTask.class, Builder.class);
+ }
+
+ // Construct using com.hello.suripu.api.tasks.TaskProtos.WorkerTask.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ taskType_ = TaskType.UNKNOWN;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return TaskProtos.internal_static_WorkerTask_descriptor;
+ }
+
+ public WorkerTask getDefaultInstanceForType() {
+ return WorkerTask.getDefaultInstance();
+ }
+
+ public WorkerTask build() {
+ WorkerTask result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public WorkerTask buildPartial() {
+ WorkerTask result = new WorkerTask(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.taskType_ = taskType_;
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ pillIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
+ pillIds_);
+ bitField0_ = (bitField0_ & ~0x00000002);
+ }
+ result.pillIds_ = pillIds_;
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ senseIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
+ senseIds_);
+ bitField0_ = (bitField0_ & ~0x00000004);
+ }
+ result.senseIds_ = senseIds_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof WorkerTask) {
+ return mergeFrom((WorkerTask)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(WorkerTask other) {
+ if (other == WorkerTask.getDefaultInstance()) return this;
+ if (other.hasTaskType()) {
+ setTaskType(other.getTaskType());
+ }
+ if (!other.pillIds_.isEmpty()) {
+ if (pillIds_.isEmpty()) {
+ pillIds_ = other.pillIds_;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ } else {
+ ensurePillIdsIsMutable();
+ pillIds_.addAll(other.pillIds_);
+ }
+ onChanged();
+ }
+ if (!other.senseIds_.isEmpty()) {
+ if (senseIds_.isEmpty()) {
+ senseIds_ = other.senseIds_;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ } else {
+ ensureSenseIdsIsMutable();
+ senseIds_.addAll(other.senseIds_);
+ }
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ WorkerTask parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (WorkerTask) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional .WorkerTask.TaskType task_type = 1;
+ private TaskType taskType_ = TaskType.UNKNOWN;
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public boolean hasTaskType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public TaskType getTaskType() {
+ return taskType_;
+ }
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public Builder setTaskType(TaskType value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ taskType_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional .WorkerTask.TaskType task_type = 1;
+ */
+ public Builder clearTaskType() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ taskType_ = TaskType.UNKNOWN;
+ onChanged();
+ return this;
+ }
+
+ // repeated string pill_ids = 2;
+ private com.google.protobuf.LazyStringList pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ private void ensurePillIdsIsMutable() {
+ if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+ pillIds_ = new com.google.protobuf.LazyStringArrayList(pillIds_);
+ bitField0_ |= 0x00000002;
+ }
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public java.util.List
+ getPillIdsList() {
+ return java.util.Collections.unmodifiableList(pillIds_);
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public int getPillIdsCount() {
+ return pillIds_.size();
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public String getPillIds(int index) {
+ return pillIds_.get(index);
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public com.google.protobuf.ByteString
+ getPillIdsBytes(int index) {
+ return pillIds_.getByteString(index);
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public Builder setPillIds(
+ int index, String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePillIdsIsMutable();
+ pillIds_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public Builder addPillIds(
+ String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePillIdsIsMutable();
+ pillIds_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public Builder addAllPillIds(
+ Iterable values) {
+ ensurePillIdsIsMutable();
+ super.addAll(values, pillIds_);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public Builder clearPillIds() {
+ pillIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string pill_ids = 2;
+ */
+ public Builder addPillIdsBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePillIdsIsMutable();
+ pillIds_.add(value);
+ onChanged();
+ return this;
+ }
+
+ // repeated string sense_ids = 3;
+ private com.google.protobuf.LazyStringList senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ private void ensureSenseIdsIsMutable() {
+ if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+ senseIds_ = new com.google.protobuf.LazyStringArrayList(senseIds_);
+ bitField0_ |= 0x00000004;
+ }
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public java.util.List
+ getSenseIdsList() {
+ return java.util.Collections.unmodifiableList(senseIds_);
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public int getSenseIdsCount() {
+ return senseIds_.size();
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public String getSenseIds(int index) {
+ return senseIds_.get(index);
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public com.google.protobuf.ByteString
+ getSenseIdsBytes(int index) {
+ return senseIds_.getByteString(index);
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public Builder setSenseIds(
+ int index, String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSenseIdsIsMutable();
+ senseIds_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public Builder addSenseIds(
+ String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSenseIdsIsMutable();
+ senseIds_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public Builder addAllSenseIds(
+ Iterable values) {
+ ensureSenseIdsIsMutable();
+ super.addAll(values, senseIds_);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public Builder clearSenseIds() {
+ senseIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string sense_ids = 3;
+ */
+ public Builder addSenseIdsBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSenseIdsIsMutable();
+ senseIds_.add(value);
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:WorkerTask)
+ }
+
+ static {
+ defaultInstance = new WorkerTask(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:WorkerTask)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_WorkerTask_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_WorkerTask_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ String[] descriptorData = {
+ "\n\021worker_task.proto\"\214\001\n\nWorkerTask\022\'\n\tta" +
+ "sk_type\030\001 \001(\0162\024.WorkerTask.TaskType\022\020\n\010p" +
+ "ill_ids\030\002 \003(\t\022\021\n\tsense_ids\030\003 \003(\t\"0\n\010Task" +
+ "Type\022\013\n\007UNKNOWN\020\000\022\027\n\023PILL_CLASSIFICATION" +
+ "\020\001B(\n\032com.hello.suripu.api.tasksB\nTaskPr" +
+ "otos"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_WorkerTask_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_WorkerTask_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_WorkerTask_descriptor,
+ new String[] { "TaskType", "PillIds", "SenseIds", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java b/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java
index 1a78158d1..c5a069cc0 100644
--- a/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java
+++ b/suripu-core/src/main/java/com/hello/suripu/core/configuration/QueueName.java
@@ -10,7 +10,8 @@ public enum QueueName {
ENCODE_AUDIO("encode_audio"),
BATCH_PILL_DATA ("batch_pill_data"),
SENSE_SENSORS_DATA("sense_sensors_data"),
- LOGS("logs");
+ LOGS("logs"),
+ WORKER_TASKS("worker_tasks");
private String value;
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java b/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java
new file mode 100644
index 000000000..a2764a734
--- /dev/null
+++ b/suripu-core/src/main/java/com/hello/suripu/core/db/PillClassificationDAO.java
@@ -0,0 +1,34 @@
+package com.hello.suripu.core.db;
+
+import com.google.common.base.Optional;
+import com.hello.suripu.core.db.binders.BindPillClassification;
+import com.hello.suripu.core.db.mappers.PillClassificationMapper;
+import com.hello.suripu.core.models.PillClassification;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.customizers.SingleValueResult;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public interface PillClassificationDAO {
+ @RegisterMapper(PillClassificationMapper.class)
+ @SingleValueResult(PillClassification.class)
+ @SqlQuery("SELECT * FROM pill_classification WHERE internal_pill_id = :internal_pill_id;")
+ Optional getByInternalPillId(@Bind("internal_pill_id") final long internalPillId);
+
+ @SqlUpdate("UPDATE pill_classification SET max_24hr_diff = 0, max_72hr_diff = 0 WHERE internal_pill_id = :internal_pill_id;")
+ Integer resetByInternalPillId(@Bind("internal_pill_id") final long internalPillId);
+
+ @SqlUpdate("INSERT INTO pill_classification (" +
+ "internal_pill_id,pill_id,last_24pt_window_ts,last_72pt_window_ts,last_update_batt,max_24hr_diff,max_72hr_diff,class) " +
+ "VALUES (:internal_pill_id, :pill_id, :last_24pt_window_ts, :last_72pt_window_ts, :last_update_batt, :max_24hr_diff, :max_72hr_diff, :class);")
+ Integer insert(@BindPillClassification final PillClassification classification);
+
+ @SqlUpdate("UPDATE pill_classification SET max_24hr_diff = :max_24hr_diff, max_72hr_diff = :max_72hr_diff, " +
+ "last_24pt_window_ts = :last_24pt_window_ts, last_72pt_window_ts = :last_72pt_window_ts, " +
+ "last_update_batt = :last_update_batt, class = :class")
+ Integer update(@BindPillClassification final PillClassification classification);
+}
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java b/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java
index f6b2b6a79..054cc8865 100644
--- a/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java
+++ b/suripu-core/src/main/java/com/hello/suripu/core/db/PillHeartBeatDAO.java
@@ -13,6 +13,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public abstract class PillHeartBeatDAO {
private final static Logger LOGGER = LoggerFactory.getLogger(PillHeartBeatDAO.class);
@@ -26,6 +28,13 @@ public abstract class PillHeartBeatDAO {
"FROM pill_status WHERE pill_id = :pill_id and last_updated > now() - interval '24 hours' ORDER BY last_updated DESC LIMIT 1")
public abstract Optional getPillStatus(@Bind("pill_id") final Long pillId);
+ @RegisterMapper(DeviceStatusMapper.class)
+ @SqlQuery("SELECT * FROM pill_status WHERE pill_id = :pill_id AND last_updated > :from AND last_updated <= :to ORDER BY last_updated ASC")
+ public abstract List getPillStatusBetweenUTC(@Bind("pill_id") final Long pillId, @Bind("from") final DateTime from, @Bind("to") final DateTime to);
+
+ @SqlQuery("SELECT DISTINCT ON (pill_id) pill_id FROM pill_status WHERE last_updated > now() - interval '30 hours' AND battery_level < 80;")
+ public abstract List getPillIdsSeenInLast24Hours();
+
public void silentInsert(final Long internalPillId, final Integer batteryLevel, final Integer uptime, final Integer firmwareVersion, final DateTime lastUpdated) {
try {
LOGGER.debug("last updated: {}", lastUpdated);
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java b/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java
new file mode 100644
index 000000000..6070457eb
--- /dev/null
+++ b/suripu-core/src/main/java/com/hello/suripu/core/db/binders/BindPillClassification.java
@@ -0,0 +1,39 @@
+package com.hello.suripu.core.db.binders;
+
+import com.hello.suripu.core.models.PillClassification;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.BinderFactory;
+import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
+
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+@BindingAnnotation(BindPillClassification.BindPillClassificationFactory.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.PARAMETER})
+public @interface BindPillClassification {
+ public static class BindPillClassificationFactory implements BinderFactory {
+ public Binder build(final Annotation annotation) {
+ return new Binder() {
+ public void bind(final SQLStatement q, final BindPillClassification bind, final PillClassification arg) {
+ q.bind("id", arg.id);
+ q.bind("internal_pill_id", arg.internalPillId);
+ q.bind("pill_id", arg.pillId);
+ q.bind("last_24pt_window_ts", arg.last24PointWindowStartTime);
+ q.bind("last_72pt_window_ts", arg.last72PointWindowStartTime);
+ q.bind("last_update_batt", PillClassification.floatToInt(arg.lastClassificationBatteryLevel));
+ q.bind("max_24hr_diff", PillClassification.floatToInt(arg.max24HoursBatteryDelta));
+ q.bind("max_72hr_diff", PillClassification.floatToInt(arg.max72HoursBatteryDelta));
+ q.bind("class", arg.status.toInt());
+ }
+ };
+ }
+ }
+}
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java b/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java
new file mode 100644
index 000000000..99d881cd7
--- /dev/null
+++ b/suripu-core/src/main/java/com/hello/suripu/core/db/mappers/PillClassificationMapper.java
@@ -0,0 +1,33 @@
+package com.hello.suripu.core.db.mappers;
+
+import com.hello.suripu.core.models.PillClassification;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public class PillClassificationMapper implements ResultSetMapper {
+ @Override
+ public PillClassification map(int index, final ResultSet r, final StatementContext ctx) throws SQLException {
+ final PillClassification classification = new PillClassification(
+ r.getLong("id"),
+ r.getLong("internal_pill_id"),
+ r.getString("pill_id"),
+
+ r.getTimestamp("last_24pt_window_ts").getTime(),
+ r.getTimestamp("last_72pt_window_ts").getTime(),
+
+ PillClassification.intToFloat(r.getInt("last_update_batt")),
+
+ PillClassification.intToFloat(r.getInt("max_24hr_diff")),
+ PillClassification.intToFloat(r.getInt("max_72hr_diff")),
+
+ r.getInt("class")
+ );
+ return classification;
+ }
+}
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java
new file mode 100644
index 000000000..b3738f2a4
--- /dev/null
+++ b/suripu-core/src/main/java/com/hello/suripu/core/models/PillClassification.java
@@ -0,0 +1,87 @@
+package com.hello.suripu.core.models;
+
+import com.google.common.base.Optional;
+import com.hello.suripu.core.db.PillHeartBeatDAO;
+import com.hello.suripu.core.util.PillStatus;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public class PillClassification {
+ public final long id;
+ public final long internalPillId;
+ public final String pillId;
+ public final DateTime last24PointWindowStartTime;
+ public final DateTime last72PointWindowStartTime;
+ public final float lastClassificationBatteryLevel;
+ public final float max24HoursBatteryDelta;
+ public final float max72HoursBatteryDelta;
+
+ public final PillStatus status;
+
+ public PillClassification(final long id,
+ final long internalPillId,
+ final String pillId,
+ final long last24PointWindowStartMillis,
+ final long last72PointWindowStartMillis,
+ final float lastClassificationBatteryLevel,
+ final float max24HoursBatteryDelta,
+ final float max72HoursBatteryDelta,
+ final int classificationResult){
+ this.id = id;
+ this.internalPillId = internalPillId;
+ this.pillId = pillId;
+ this.last24PointWindowStartTime = new DateTime(last24PointWindowStartMillis, DateTimeZone.UTC);
+ this.last72PointWindowStartTime = new DateTime(last72PointWindowStartMillis, DateTimeZone.UTC);
+
+ this.lastClassificationBatteryLevel = lastClassificationBatteryLevel;
+ this.status = PillStatus.fromInt(classificationResult);
+
+ this.max24HoursBatteryDelta = max24HoursBatteryDelta;
+ this.max72HoursBatteryDelta = max72HoursBatteryDelta;
+
+ }
+
+ public static int floatToInt(final float value){
+ return (int)(value * 100);
+ }
+
+ public static float intToFloat(final int value){
+ return value / 100f;
+ }
+
+ public static DateTime getQueryStartTime(final Optional lastRecord){
+ if(!lastRecord.isPresent()){
+ return new DateTime(0, DateTimeZone.UTC);
+ }
+
+ return lastRecord.get().last24PointWindowStartTime;
+ }
+
+ public static List restoreBuffer(final long internalPillId, final DateTime queryStartTime, final DateTime queryEndTime, final PillHeartBeatDAO pillHeartBeatDAO){
+ return pillHeartBeatDAO.getPillStatusBetweenUTC(internalPillId, queryStartTime, queryEndTime);
+ }
+
+ /*
+ * Decision boundary from SVM
+ * -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0
+ */
+ public static PillStatus classify(final float[] features){
+ if(features.length < 2){
+ return PillStatus.UNKNOWN;
+ }
+
+ final float dayToDayDrop = features[0];
+ final float threeDayDrop = features[1];
+ final float result = -0.0064f * dayToDayDrop + 0.1281f * threeDayDrop - 3.4344f;
+ if(result > 0){
+ return PillStatus.NORMAL;
+ }
+
+ return PillStatus.BAD;
+ }
+}
diff --git a/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java b/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java
new file mode 100644
index 000000000..22c62c550
--- /dev/null
+++ b/suripu-core/src/main/java/com/hello/suripu/core/util/PillStatus.java
@@ -0,0 +1,32 @@
+package com.hello.suripu.core.util;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public enum PillStatus {
+ UNKNOWN,
+ BAD,
+ NORMAL;
+
+ public static PillStatus fromInt(final int value) {
+ switch (value){
+ case 1:
+ return BAD;
+ case 0:
+ return NORMAL;
+ default:
+ return UNKNOWN;
+ }
+ }
+
+ public int toInt(){
+ switch (this){
+ case BAD:
+ return 1;
+ case NORMAL:
+ return 0;
+ default:
+ return -1;
+ }
+ }
+}
diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example
new file mode 100644
index 000000000..0de708369
--- /dev/null
+++ b/suripu-workers/configs/pillstatus/pillstatus_worker.dev.yml.example
@@ -0,0 +1,101 @@
+debug: true
+metrics_enabled: false
+graphite:
+ host: carbon.hostedgraphite.com
+ api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336
+ reporting_interval_in_seconds: 30
+ include_metrics:
+ - com.yammer
+ - com.hello
+
+sensors_db:
+ # the name of your JDBC driver
+ driverClass: org.postgresql.Driver
+
+ # the username
+ user: your postgresql username
+
+ # the password
+ password: your postgresql password
+
+ # the JDBC URL
+ url: jdbc:postgresql://localhost:5432/[your postgresql username]
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+common_db:
+ # the name of your JDBC driver
+ driverClass: org.postgresql.Driver
+
+ # the username
+ user: your postgresql username
+
+ # the password
+ password: hello ingress user
+
+ # the JDBC URL
+ url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+ # the maximum amount of time to wait on an empty pool before throwing an exception
+ maxWaitForConnection: 1s
+
+ # the SQL query to run when validating a connection's liveness
+ validationQuery: "/* MyService Health Check */ SELECT 1"
+
+ # the minimum number of connections to keep open
+ minSize: 8
+
+ # the maximum number of connections to keep open
+ maxSize: 32
+
+ # whether or not idle connections should be validated
+ checkConnectionWhileIdle: false
+
+ # how long a connection must be held before it can be validated
+ checkConnectionHealthWhenIdleFor: 10s
+
+ # the maximum lifetime of an idle connection
+ closeConnectionIfIdleFor: 1 minute
+
+
+kinesis:
+ endpoint : https://kinesis.us-east-1.amazonaws.com
+ streams :
+ worker_tasks : prod_worker_tasks
+
+app_name: PillStatusWorkerDev
+
+max_records: 1000
+
+# Logging settings.
+logging:
+
+ # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.
+ level: INFO
+
+ # Logger-specific levels.
+ loggers:
+
+ # Sets the level for 'com.example.app' to DEBUG.
+ com.hello.suripu.workers: DEBUG
+
+
+dynamodb:
+ region: us-east-1
+ tables:
+ features: features
+ endpoints:
+ features : http://localhost:7777
+
+kinesis_logger:
+ stream_name: dev_logs
+ enabled : false
+ buffer_size: 100
+ origin: suripu-workers
+ min_log_level : INFO
diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml
new file mode 100644
index 000000000..24b9ff635
--- /dev/null
+++ b/suripu-workers/configs/pillstatus/pillstatus_worker.prod.yml
@@ -0,0 +1,110 @@
+debug: false
+metrics_enabled: true
+graphite:
+ host: carbon.hostedgraphite.com
+ api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336
+ reporting_interval_in_seconds: 30
+ include_metrics:
+ - com.yammer
+ - com.hello
+
+sensors_db:
+ driverClass: org.postgresql.Driver
+ user: sensors
+ password: hello-sensors
+ url: jdbc:postgresql://sensors-1-replica-1.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/sensors1
+ properties:
+ hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+ # the maximum amount of time to wait on an empty pool before throwing an exception
+ maxWaitForConnection: 1s
+
+ # the SQL query to run when validating a connection's liveness
+ validationQuery: "/* MyService Health Check */ SELECT 1"
+
+ # the minimum number of connections to keep open
+ minSize: 2
+
+ # the maximum number of connections to keep open
+ maxSize: 8
+
+ # whether or not idle connections should be validated
+ checkConnectionWhileIdle: false
+
+ # how long a connection must be held before it can be validated
+ checkConnectionHealthWhenIdleFor: 10s
+
+ # the maximum lifetime of an idle connection
+ closeConnectionIfIdleFor: 1 minute
+
+common_db:
+ driverClass: org.postgresql.Driver
+ user: common
+ password: hello-common
+ url: jdbc:postgresql://common.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/common
+ properties:
+ hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+ # the maximum amount of time to wait on an empty pool before throwing an exception
+ maxWaitForConnection: 1s
+
+ # the SQL query to run when validating a connection's liveness
+ validationQuery: "/* MyService Health Check */ SELECT 1"
+
+ # the minimum number of connections to keep open
+ minSize: 2
+
+ # the maximum number of connections to keep open
+ maxSize: 8
+
+ # whether or not idle connections should be validated
+ checkConnectionWhileIdle: false
+
+ # how long a connection must be held before it can be validated
+ checkConnectionHealthWhenIdleFor: 10s
+
+ # the maximum lifetime of an idle connection
+ closeConnectionIfIdleFor: 1 minute
+
+kinesis:
+ endpoint : https://kinesis.us-east-1.amazonaws.com
+ streams :
+ worker_tasks : prod_worker_tasks
+
+app_name: PillStatusWorkerProd
+
+max_records: 20
+
+# Logging settings.
+logging:
+
+ # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.
+ level: INFO
+
+ # Logger-specific levels.
+ loggers:
+
+ # Sets the level for 'com.example.app' to DEBUG.
+ com.hello.suripu.workers: DEBUG
+
+dynamodb:
+ region: us-east-1
+ tables:
+ features: features
+ endpoints:
+ features : http://dynamodb.us-east-1.amazonaws.com
+
+kinesis_logger:
+ stream_name: logs
+ enabled : true
+ buffer_size: 100
+ origin: suripu-workers
+ min_log_level : INFO
diff --git a/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml
new file mode 100644
index 000000000..d97302940
--- /dev/null
+++ b/suripu-workers/configs/pillstatus/pillstatus_worker.staging.yml
@@ -0,0 +1,112 @@
+debug: false
+metrics_enabled: false
+graphite:
+ host: carbon.hostedgraphite.com
+ api_key: 7509c0ff-4db5-4cae-91ee-6e78ff13b336
+ reporting_interval_in_seconds: 30
+ include_metrics:
+ - com.yammer
+ - com.hello
+
+sensors_db:
+ driverClass: org.postgresql.Driver
+ user: ingress_user
+ password: hello ingress user
+ url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku
+ properties:
+ hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+ # the maximum amount of time to wait on an empty pool before throwing an exception
+ maxWaitForConnection: 1s
+
+ # the SQL query to run when validating a connection's liveness
+ validationQuery: "/* MyService Health Check */ SELECT 1"
+
+ # the minimum number of connections to keep open
+ minSize: 2
+
+ # the maximum number of connections to keep open
+ maxSize: 8
+
+ # whether or not idle connections should be validated
+ checkConnectionWhileIdle: false
+
+ # how long a connection must be held before it can be validated
+ checkConnectionHealthWhenIdleFor: 10s
+
+ # the maximum lifetime of an idle connection
+ closeConnectionIfIdleFor: 1 minute
+
+common_db:
+ driverClass: org.postgresql.Driver
+ user: ingress_user
+ password: hello ingress user
+ url: jdbc:postgresql://chanku-test.cdawj8qazvva.us-east-1.rds.amazonaws.com:5432/chanku
+ properties:
+ hibernate.dialect: org.hibernate.spatial.dialect.postgis.PostgisDialect
+
+ # any properties specific to your JDBC driver:
+ properties:
+ charSet: UTF-8
+
+ # the maximum amount of time to wait on an empty pool before throwing an exception
+ maxWaitForConnection: 1s
+
+ # the SQL query to run when validating a connection's liveness
+ validationQuery: "/* MyService Health Check */ SELECT 1"
+
+ # the minimum number of connections to keep open
+ minSize: 2
+
+ # the maximum number of connections to keep open
+ maxSize: 8
+
+ # whether or not idle connections should be validated
+ checkConnectionWhileIdle: false
+
+ # how long a connection must be held before it can be validated
+ checkConnectionHealthWhenIdleFor: 10s
+
+ # the maximum lifetime of an idle connection
+ closeConnectionIfIdleFor: 1 minute
+
+
+kinesis:
+ endpoint : https://kinesis.us-east-1.amazonaws.com
+ streams :
+ worker_tasks : dev_worker_tasks
+
+app_name: PillStatusWorker
+
+max_records: 20
+
+# Logging settings.
+logging:
+
+ # The default level of all loggers. Can be OFF, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.
+ level: INFO
+
+ # Logger-specific levels.
+ loggers:
+
+ # Sets the level for 'com.example.app' to DEBUG.
+ com.hello.suripu.workers: DEBUG
+
+
+dynamodb:
+ region: us-east-1
+ tables:
+ features: features
+ endpoints:
+ features : http://dynamodb.us-east-1.amazonaws.com
+
+kinesis_logger:
+ stream_name: dev_logs
+ enabled : false
+ buffer_size: 100
+ origin: suripu-workers
+ min_log_level : INFO
diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java
new file mode 100644
index 000000000..e30d6dbe4
--- /dev/null
+++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessor.java
@@ -0,0 +1,286 @@
+package com.hello.suripu.workers.pillstatus;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.model.Record;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.hello.suripu.api.tasks.TaskProtos;
+import com.hello.suripu.core.db.DeviceDAO;
+import com.hello.suripu.core.db.PillClassificationDAO;
+import com.hello.suripu.core.db.PillHeartBeatDAO;
+import com.hello.suripu.core.models.DeviceAccountPair;
+import com.hello.suripu.core.models.DeviceStatus;
+import com.hello.suripu.core.models.PillClassification;
+import com.hello.suripu.core.util.PillStatus;
+import com.hello.suripu.workers.framework.HelloBaseRecordProcessor;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public class PillStatusRecordProcessor extends HelloBaseRecordProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PillStatusRecordProcessor.class);
+ private static final int PILL_BATT_LEVEL_FLUCTUATE_RANGE = 20;
+
+ private final PillHeartBeatDAO pillHeartBeatDAO;
+ private final PillClassificationDAO pillClassificationDAO;
+ private final DeviceDAO deviceDAO;
+
+ private final Meter notRegisteredPillMeter;
+ private final Meter batteryRecoverMeter;
+ private final Meter dyingPillMeter;
+ private final Meter newConvertedDyingPill;
+
+ public PillStatusRecordProcessor(final DeviceDAO deviceDAO,
+ final PillHeartBeatDAO pillHeartBeatDAO,
+ final PillClassificationDAO pillClassificationDAO){
+ this.pillHeartBeatDAO = pillHeartBeatDAO;
+ this.deviceDAO = deviceDAO;
+ this.pillClassificationDAO = pillClassificationDAO;
+
+ this.notRegisteredPillMeter = Metrics.defaultRegistry().newMeter(
+ PillStatusRecordProcessor.class,
+ "pill-not-registered",
+ "pill-not-registered",
+ TimeUnit.SECONDS);
+ this.batteryRecoverMeter = Metrics.defaultRegistry().newMeter(
+ PillStatusRecordProcessor.class,
+ "pill-recover",
+ "pill-recover",
+ TimeUnit.SECONDS
+ );
+ this.dyingPillMeter = Metrics.defaultRegistry().newMeter(
+ PillStatusRecordProcessor.class,
+ "dying-pill-count",
+ "dying-pill-count",
+ TimeUnit.SECONDS
+ );
+ this.newConvertedDyingPill = Metrics.defaultRegistry().newMeter(
+ PillStatusRecordProcessor.class,
+ "detected-dying-pill",
+ "detected-dying-pill",
+ TimeUnit.SECONDS
+ );
+ }
+
+ @Override
+ public void initialize(final String s) {
+ LOGGER.info("PillStatusRecordProcessor initialized: {}", s);
+ }
+
+
+ private List removeDuplicate(final List deviceStatuses){
+ final List copy = new ArrayList<>();
+
+
+ DateTime lastHeartBeatTime = null;
+ for(final DeviceStatus deviceStatus:deviceStatuses){
+ if((lastHeartBeatTime == null) ||
+ (lastHeartBeatTime != null && !deviceStatus.lastSeen.equals(lastHeartBeatTime))){
+ copy.add(deviceStatus);
+ }
+
+ lastHeartBeatTime = deviceStatus.lastSeen;
+ }
+
+ return copy;
+ }
+
+
+ private float getDrop(final List window, final int windowSize, final int smoothCount){
+ if(window.size() < windowSize || window.size() < smoothCount){
+ return 0f;
+ }
+
+ long startSum = 0;
+ long endSum = 0;
+ for(int i = 0; i < smoothCount; i++){
+ startSum += window.get(i).batteryLevel;
+ endSum += window.get(window.size() - i - 1).batteryLevel;
+ }
+
+ final float startBatteryLevel = startSum / (float)smoothCount;
+ final float endBatteryLevel = endSum / (float)smoothCount;
+
+ final float drop = startBatteryLevel - endBatteryLevel;
+ return drop;
+ }
+
+
+ private Optional getMaxDrop(final List restoredWindow, final DateTime windowStartTime, final int windowSize){
+
+ final List restoredWindowMutable = Lists.newArrayList(restoredWindow);
+ Collections.sort(restoredWindowMutable, new Comparator() {
+ @Override
+ public int compare(final DeviceStatus o1, final DeviceStatus o2) {
+ return o1.lastSeen.compareTo(o2.lastSeen);
+ }
+ });
+
+ final List bufferNoDuplication = removeDuplicate(restoredWindowMutable);
+ final LinkedList slidingWindow = new LinkedList<>();
+ Optional maxDrop = Optional.absent();
+
+ for(final DeviceStatus deviceStatus:bufferNoDuplication){
+ if(deviceStatus.batteryLevel > 100){
+ // skip the level interrupt checker
+ continue;
+ }
+
+ if(deviceStatus.lastSeen.isBefore(windowStartTime)){
+ continue;
+ }
+
+ slidingWindow.add(deviceStatus);
+ if(slidingWindow.size() == windowSize){
+
+ final float drop = getDrop(slidingWindow, windowSize, 5);
+ if(maxDrop.isPresent() == false || drop > maxDrop.get()){
+ maxDrop = Optional.of(drop);
+ }
+ slidingWindow.removeFirst();
+ }
+ }
+
+ return maxDrop;
+
+ }
+
+ public void classifyPills(final List internalPillIds){
+ final RateLimiter rateLimiter = RateLimiter.create(100d);
+ for(final Long internalPillId:internalPillIds){
+ rateLimiter.acquire();
+
+ final Optional pillClassificationOptional = this.pillClassificationDAO.getByInternalPillId(internalPillId);
+
+ // Figure out where we ends the classification so we can reconstruct the sliding window.
+ final DateTime queryStartTimeFor24hrWindow = pillClassificationOptional.isPresent() ?
+ pillClassificationOptional.get().last24PointWindowStartTime :
+ new DateTime(0, DateTimeZone.UTC);
+ final DateTime queryStartTimeFor72hrWindow = pillClassificationOptional.isPresent() ?
+ pillClassificationOptional.get().last72PointWindowStartTime :
+ new DateTime(0, DateTimeZone.UTC);
+ final List dataFromLast72Hours = this.pillHeartBeatDAO.getPillStatusBetweenUTC(internalPillId,
+ queryStartTimeFor72hrWindow,
+ DateTime.now());
+
+ if(dataFromLast72Hours.isEmpty()){
+ continue;
+ }
+
+ // Extract the two features we need:
+ // day to day drop
+ // and drop on every 3 days.
+ final Optional maxDrop24hr = getMaxDrop(dataFromLast72Hours,
+ queryStartTimeFor24hrWindow,
+ 24);
+ final Optional maxDrop72hr = getMaxDrop(dataFromLast72Hours,
+ queryStartTimeFor72hrWindow,
+ 72);
+
+ if(!maxDrop24hr.isPresent() || !maxDrop72hr.isPresent()){
+ LOGGER.debug("Not enough data to classify pill {}", internalPillId);
+ continue;
+ }
+
+ // Plug it into the decision boundary:
+ // -0.0064 * d2d + 0.1281 * d3d - 3.4344 = 0
+ final PillStatus status = PillClassification.classify(new float[]{maxDrop24hr.get(), maxDrop72hr.get()});
+
+ final DeviceStatus lastHeartBeat = dataFromLast72Hours.get(dataFromLast72Hours.size() - 1);
+ final long lastHeartBeatTimeMillis = lastHeartBeat.lastSeen.getMillis();
+ final PillClassification classification = new PillClassification(0L, internalPillId,
+ "", // we should think a way to get this.
+ lastHeartBeatTimeMillis,
+ lastHeartBeatTimeMillis,
+ lastHeartBeat.batteryLevel,
+ Math.max(maxDrop24hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max24HoursBatteryDelta : 0f),
+ Math.max(maxDrop72hr.get(), pillClassificationOptional.isPresent() ? pillClassificationOptional.get().max72HoursBatteryDelta : 0f),
+ status.toInt()
+ );
+
+
+
+ if(!pillClassificationOptional.isPresent()){
+ this.pillClassificationDAO.insert(classification);
+ }else{
+ this.pillClassificationDAO.update(classification);
+ }
+ this.newConvertedDyingPill.mark();
+
+ }
+ }
+
+ @Override
+ public void processRecords(final List list, final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
+ for(final Record record:list){
+ try {
+ final TaskProtos.WorkerTask workerTask = TaskProtos.WorkerTask.parseFrom(record.getData().array());
+ if(workerTask.getTaskType() != TaskProtos.WorkerTask.TaskType.PILL_CLASSIFICATION){
+ continue;
+ }
+
+ final List internalPillIdsToBeClassified = new ArrayList<>();
+ final List pillIds = workerTask.getPillIdsList();
+
+ for(final String pillId:pillIds){
+ final Optional registeredPill = this.deviceDAO.getInternalPillId(pillId);
+ if(!registeredPill.isPresent()){
+ LOGGER.info("Pill {} not found in tracker account map, pill might be unpaired.");
+ continue;
+ }
+
+ internalPillIdsToBeClassified.add(registeredPill.get().internalDeviceId);
+ }
+
+
+ if(workerTask.getPillIdsCount() == 0){
+ final List pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery = this.pillHeartBeatDAO.getPillIdsSeenInLast24Hours();
+ LOGGER.info("Found {} pills to be classified.", pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery.size());
+
+ internalPillIdsToBeClassified.addAll(pillsSeenFromLast24HoursAndHaveLessThan80PercentBattery);
+ }
+
+ classifyPills(internalPillIdsToBeClassified);
+
+ } catch (InvalidProtocolBufferException e) {
+ LOGGER.error("Failed to decode protobuf: {}", e.getMessage());
+ }
+ }
+
+
+
+ try {
+ iRecordProcessorCheckpointer.checkpoint();
+ } catch (InvalidStateException e) {
+ LOGGER.error("checkpoint {}", e.getMessage());
+ } catch (ShutdownException e) {
+ LOGGER.error("Received shutdown command at checkpoint, bailing. {}", e.getMessage());
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void shutdown(final IRecordProcessorCheckpointer iRecordProcessorCheckpointer, final ShutdownReason shutdownReason) {
+ LOGGER.warn("SHUTDOWN: {}", shutdownReason.toString());
+ System.exit(1);
+ }
+}
diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java
new file mode 100644
index 000000000..b6303f50a
--- /dev/null
+++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusRecordProcessorFactory.java
@@ -0,0 +1,33 @@
+package com.hello.suripu.workers.pillstatus;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
+import com.hello.suripu.core.db.DeviceDAO;
+import com.hello.suripu.core.db.PillClassificationDAO;
+import com.hello.suripu.core.db.PillHeartBeatDAO;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public class PillStatusRecordProcessorFactory implements IRecordProcessorFactory {
+ private final PillHeartBeatDAO pillHeartBeatDAO;
+ private final PillClassificationDAO pillClassificationDAO;
+ private final DeviceDAO deviceDAO;
+
+
+ public PillStatusRecordProcessorFactory(final DeviceDAO deviceDAO,
+ final PillClassificationDAO pillClassificationDAO,
+ final PillHeartBeatDAO pillHeartBeatDAO){
+ this.pillHeartBeatDAO = pillHeartBeatDAO;
+ this.pillClassificationDAO = pillClassificationDAO;
+ this.deviceDAO = deviceDAO;
+ }
+
+ @Override
+ public IRecordProcessor createProcessor() {
+ final IRecordProcessor recordProcessor = new PillStatusRecordProcessor(this.deviceDAO,
+ this.pillHeartBeatDAO,
+ this.pillClassificationDAO);
+ return recordProcessor;
+ }
+}
diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java
new file mode 100644
index 000000000..3f0d64ffe
--- /dev/null
+++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerCommand.java
@@ -0,0 +1,90 @@
+package com.hello.suripu.workers.pillstatus;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.google.common.collect.ImmutableMap;
+import com.hello.suripu.core.configuration.QueueName;
+import com.hello.suripu.core.db.DeviceDAO;
+import com.hello.suripu.core.db.PillClassificationDAO;
+import com.hello.suripu.core.db.PillHeartBeatDAO;
+import com.hello.suripu.core.db.util.JodaArgumentFactory;
+import com.hello.suripu.core.db.util.PostgresIntegerArrayArgumentFactory;
+import com.hello.suripu.workers.framework.WorkerEnvironmentCommand;
+import com.yammer.dropwizard.config.Environment;
+import com.yammer.dropwizard.db.ManagedDataSourceFactory;
+import com.yammer.dropwizard.jdbi.ImmutableListContainerFactory;
+import com.yammer.dropwizard.jdbi.ImmutableSetContainerFactory;
+import com.yammer.dropwizard.jdbi.OptionalContainerFactory;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Created by pangwu on 6/23/15.
+ */
+public class PillStatusWorkerCommand extends WorkerEnvironmentCommand {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(PillStatusWorkerCommand.class);
+
+ public PillStatusWorkerCommand(){
+ super("pillstatus_worker", "Worker automatically classify quick discharging pills");
+ }
+
+ @Override
+ protected void run(final Environment environment, final Namespace namespace, final PillStatusWorkerConfiguration configuration) throws Exception {
+ final ManagedDataSourceFactory managedDataSourceFactory = new ManagedDataSourceFactory();
+
+ final DBI commonDB = new DBI(managedDataSourceFactory.build(configuration.getCommonDB()));
+ final DBI sensorsDB = new DBI(managedDataSourceFactory.build(configuration.getSensorsDB()));
+
+ sensorsDB.registerArgumentFactory(new JodaArgumentFactory());
+ sensorsDB.registerContainerFactory(new OptionalContainerFactory());
+ sensorsDB.registerArgumentFactory(new PostgresIntegerArrayArgumentFactory());
+ sensorsDB.registerContainerFactory(new ImmutableListContainerFactory());
+ sensorsDB.registerContainerFactory(new ImmutableSetContainerFactory());
+
+
+ commonDB.registerArgumentFactory(new JodaArgumentFactory());
+ commonDB.registerContainerFactory(new OptionalContainerFactory());
+ commonDB.registerArgumentFactory(new PostgresIntegerArrayArgumentFactory());
+ commonDB.registerContainerFactory(new ImmutableListContainerFactory());
+ commonDB.registerContainerFactory(new ImmutableSetContainerFactory());
+
+ final DeviceDAO deviceDAO = commonDB.onDemand(DeviceDAO.class);
+ final PillHeartBeatDAO pillHeartBeatDAO = sensorsDB.onDemand(PillHeartBeatDAO.class);
+ final PillClassificationDAO pillClassificationDAO = sensorsDB.onDemand(PillClassificationDAO.class);
+
+ final ImmutableMap queueNames = configuration.getQueues();
+
+ LOGGER.debug("{}", queueNames);
+ final String queueName = queueNames.get(QueueName.WORKER_TASKS);
+ LOGGER.info("\n\n\n!!! This worker is using the following queue: {} !!!\n\n\n", queueName);
+
+ final AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain();
+ final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
+ final KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration(
+ configuration.getAppName(),
+ queueName,
+ awsCredentialsProvider,
+ workerId);
+ kinesisConfig.withMaxRecords(configuration.getMaxRecords());
+ kinesisConfig.withKinesisEndpoint(configuration.getKinesisEndpoint());
+ kinesisConfig.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
+
+
+ final IRecordProcessorFactory factory = new PillStatusRecordProcessorFactory(
+ deviceDAO,
+ pillClassificationDAO,
+ pillHeartBeatDAO);
+ final Worker worker = new Worker(factory, kinesisConfig);
+ worker.run();
+ }
+}
diff --git a/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java
new file mode 100644
index 000000000..86874d224
--- /dev/null
+++ b/suripu-workers/src/main/java/com/hello/suripu/workers/pillstatus/PillStatusWorkerConfiguration.java
@@ -0,0 +1,55 @@
+package com.hello.suripu.workers.pillstatus;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.hello.suripu.core.configuration.NewDynamoDBConfiguration;
+import com.hello.suripu.workers.framework.WorkerConfiguration;
+import com.yammer.dropwizard.db.DatabaseConfiguration;
+
+import javax.validation.Valid;
+import javax.validation.constraints.Max;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Created by pangwu on 6/22/15.
+ */
+public class PillStatusWorkerConfiguration extends WorkerConfiguration {
+ @Valid
+ @NotNull
+ @JsonProperty("dynamodb")
+ private NewDynamoDBConfiguration dynamoDBConfiguration;
+
+ public NewDynamoDBConfiguration getDynamoDBConfiguration(){
+ return dynamoDBConfiguration;
+ }
+
+
+ @Valid
+ @NotNull
+ @JsonProperty("sensors_db")
+ private DatabaseConfiguration sensorsDB = new DatabaseConfiguration();
+
+ public DatabaseConfiguration getSensorsDB() {
+ return sensorsDB;
+ }
+
+
+ @Valid
+ @NotNull
+ @Max(1000)
+ @JsonProperty("max_records")
+ private Integer maxRecords;
+
+ public Integer getMaxRecords() {
+ return maxRecords;
+ }
+
+
+ @Valid
+ @NotNull
+ @JsonProperty("common_db")
+ private DatabaseConfiguration commonDB = new DatabaseConfiguration();
+
+ public DatabaseConfiguration getCommonDB() {
+ return commonDB;
+ }
+}