Skip to content

Commit

Permalink
fix: fix align day
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo committed Jan 29, 2024
1 parent 7e8a3a8 commit 3a7795e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package io.holoinsight.server.agg.v1.core;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand All @@ -17,6 +20,7 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.holoinsight.server.agg.v1.core.conf.Window;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -32,6 +36,15 @@ public final class Utils {

private static final ThreadLocal<SimpleDateFormat> SHORT_SDF_TL =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss"));
private static final long ALIGN_DAY_ADJUST;
private static final long DAY_MILLS = 86400000L;

static {
Instant now = Instant.now();
ALIGN_DAY_ADJUST =
now.atZone(ZoneId.systemDefault()).truncatedTo(ChronoUnit.DAYS).toInstant().toEpochMilli()
- now.toEpochMilli() / DAY_MILLS * DAY_MILLS;
}

private Utils() {}

Expand All @@ -51,8 +64,24 @@ public static String formatTimeShort(long ts) {
return formatTimeShort(new Date(ts));
}

public static long align(long ts, long window) {
return ts / window * window;
/**
* <p>
* Align timestamp to window.interval .
* <p>
* If window.interval equals to {@link #DAY_MILLS}; then timestamp is aligned using local
* timezone.
*
* @param ts
* @param window
* @return
*/
public static long align(long ts, Window window) {
long i = window.getInterval();
if (i == DAY_MILLS) {
return ts / i * i + ALIGN_DAY_ADJUST;
} else {
return ts / i * i;
}
}

public static ExecutorService createThreadPool(String namePrefix, int size) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core;

import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;

import org.junit.Assert;
import org.junit.Test;

import io.holoinsight.server.agg.v1.core.conf.Window;

/**
* <p>
* created at 2024/1/29
*
* @author xzchaoo
*/
public class UtilsTest {

@Test
public void test_align() {
long now = System.currentTimeMillis();
Window w = new Window();
w.setInterval(86400000);
long aligned = Utils.align(now, w);
long aligned2 = Instant.ofEpochMilli(now).atZone(ZoneId.systemDefault())
.truncatedTo(ChronoUnit.DAYS).toInstant().toEpochMilli();
Assert.assertEquals(aligned2, aligned);
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(aligned)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ private void processData2(XAggTask latestAggTask, AggProtos.AggTaskValue aggTask
TableRowDataAccessor.Meta meta = new TableRowDataAccessor.Meta(table.getHeader());
TableRowDataAccessor da = new TableRowDataAccessor();
for (AggProtos.Table.Row row : table.getRowList()) {
long alignedDataTs =
Utils.align(row.getTimestamp(), latestAggTask.getInner().getWindow().getInterval());
long alignedDataTs = Utils.align(row.getTimestamp(), latestAggTask.getInner().getWindow());

if (alignedDataTs > now) {
log.error("[agg] [{}] invalid data {}]", key(), row);
Expand Down Expand Up @@ -194,9 +193,7 @@ private void processData(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskV
AggWindowState lastWindowState = null;
InDataNodeDataAccessor da = new InDataNodeDataAccessor();
for (AggProtos.InDataNode in : aggTaskValue.getInDataNodesList()) {

long alignedDataTs =
Utils.align(in.getTimestamp(), latestAggTask.getInner().getWindow().getInterval());
long alignedDataTs = Utils.align(in.getTimestamp(), latestAggTask.getInner().getWindow());

if (alignedDataTs > now) {
log.error("[agg] [{}] invalid data {}]", key(), in);
Expand Down Expand Up @@ -435,9 +432,9 @@ private void maybeFillZero(long watermark) {
long interval = lastUsedAggTask.getInner().getWindow().getInterval();
long lastEmitTimestamp = Math.max( //
// This value may be small if the state is restored from a very old state.
(state.getWatermark() - 1) / interval * interval,
Utils.align(state.getWatermark() - 1, lastUsedAggTask.getInner().getWindow()),
// So we restrict it to be within the last 60 periods of the watermark
(((watermark - 1) / interval) - 60) * interval);
Utils.align(watermark - 1, lastUsedAggTask.getInner().getWindow()) - 60 * interval);

for (long ts = lastEmitTimestamp + interval; ts < watermark; ts += interval) {
AggWindowState w = state.getAggWindowState(ts);
Expand Down

0 comments on commit 3a7795e

Please sign in to comment.