Redis-0x1c-serverCron任务

1 serverCron大任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/**
* @brief 事件管理器eventLoop面向的是事件的管理
* - 时间事件是自己亲自管理
* - 文件事件的管理主要委托给OS的IO复用器 而自己本身工作中心在于就绪事件的调度
* 因此需要明确知道时间事件的id
* eventLoop才能根据时间事件id在自己管理范围内找到时间事件
* 当初注册时间事件的时候指定过私有数据
* 此刻eventLoop回调函数的时候就可以使用私有数据了
*
* 这个serverCron是redis服务端的一个大的定时任务 这个大的任务执行线程仍然是main线程 其中定义了很多小的任务
* - 看门狗
* - 默认不开启
* - 服务端性能指标采集
* - 每秒执行的命令个数 main线程执行 每100ms执行一次
* - 每秒读流量 main线程执行 每100ms执行一次
* - 每秒写流量 main线程执行 每100ms执行一次
* - 内存相关信息采集
* - 内存使用峰值 main线程执行 大任务执行频率
* - 使用使用信息 main线程执行 每100ms执行一次
* @param eventLoop 事件管理器
* @param id 时间事件的id
* @param clientData 向eventLoop注册时间事件时候指定的私有数据 就是用在函数回调的时候的
* @return 该函数是时间事件的处理器 其返回值语义是告知eventLoop事件管理器在调度执行完一次时间事件之后 后续如何管理这个事件
* - 返回1 标识时间事件是个定时事件 只执行一次 以后不用再执行
* - 返回n 标识这个时间事件是个周期性事件 期待等个n毫秒之后再执行一次
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);

/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
/**
* 看门狗
* 默认不开启
*/
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

/* Update the time cache. */
/**
* 将系统时间缓存在服务端
* 很多地方都要用到这个时间 缓存起来 避免了每次使用都要一次系统调用的开销
*/
updateCachedTime(1);

/**
* 动态调整serverCron的运行频率
* 跟当前服务端通信的客户端越多 定时任务执行的频率越快
*/
server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */
if (server.dynamic_hz) { // 默认false 不开启动态调整serverCron执行频率
while (listLength(server.clients) / server.hz >
MAX_CLIENTS_PER_CLOCK_TICK)
{
server.hz *= 2;
if (server.hz > CONFIG_MAX_HZ) {
server.hz = CONFIG_MAX_HZ;
break;
}
}
}

run_with_period(100) { // 每100ms执行一次
long long stat_net_input_bytes, stat_net_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);

// main线程执行 记录每秒执行的命令个数
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
// main线程执行 记录读流量
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
stat_net_input_bytes);
// main线程执行 记录写流量
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
stat_net_output_bytes);
}

/* We have just LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);

/**
* 采集内存使用相关信息
* - 记录内存使用峰值 main线程 serverCron大任务执行频率
* - 内存的使用信息 main线程 每隔100ms
*/
cronUpdateMemoryStats();

/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}

/* Show some info about non-empty databases */
if (server.verbosity <= LL_VERBOSE) {
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;

size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
}
}
}
}

/* Show information about connected clients */
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_DEBUG,
"%lu clients connected (%lu replicas), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}

/* We need to do a few operations on clients asynchronously. */
clientsCron();

/* Handle background operations on Redis databases. */
databasesCron();

/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (!hasActiveChildProcess() &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}

/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
run_with_period(1000) receiveChildInfo();
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;

/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}

/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
/* Just for the sake of defensive programming, to avoid forgeting to
* call this function when need. */
updateDictResizePolicy();


/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)
flushAppendOnlyFile(0);

/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}

/* Clear the paused clients state if needed. */
checkClientPauseTimeoutAndReturnIfPaused();

/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth.
*
* If Redis is trying to failover then run the replication cron faster so
* progress on the handshake happens more quickly. */
if (server.failover_state != NO_FAILOVER) {
run_with_period(100) replicationCron();
} else {
run_with_period(1000) replicationCron();
}

/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}

/* Run the Sentinel timer if we are in sentinel mode. */
if (server.sentinel_mode) sentinelTimer();

/* Cleanup expired MIGRATE cached sockets. */
run_with_period(1000) {
migrateCloseTimedoutSockets();
}

/* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded();

/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
* executed changes the value via CONFIG SET, the server will perform
* the operation even if completely idle. */
if (server.tracking_clients) trackingLimitUsedSlots();

/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
*
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
if (!hasActiveChildProcess() &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}

/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);

// 记录serverCron这个大定时任务执行了多少次
server.cronloops++;
// hz配置在redis.conf配置文件中 默认值是10 也就是这个定时任务1s执行10次 即每隔100ms执行一次
return 1000/server.hz;
}

2 定制化运行间隔时长

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @brief 这个宏用来控制serverCron里面小任务多久执行一次
* 首先界定一件事情 hz在redis.conf配置文件中定义的默认值是10 也就意味着serverCron这个大定时任务每隔100ms会被main线程执行一次
* 这个宏函数就是用来控制某个小任务间隔多久执行一次的
* - 场景1 ms==10 间隔<serverCron的运行间隔 也就是说所有小任务的间隔时间下限就是serverCron的运行间隔时间 单独设置小任务的间隔时间 只有更大才有客制化意义
* - 场景2 ms==100 间隔==serverCron的运行间隔 跟场景1一样
* - 场景3 ms==1000 代入表达式 即意味着cronloops得是10的整数倍才能运行小任务 也就是小任务运行间隔是10轮大任务的间隔时间 即10*100=1000ms
* - 场景4 ms=1001 代入表达式 向下取整 同场景3
* - 场景5 ms=5000 代入表达式 小任务的运行间隔是50轮大任务运行间隔 即50轮*100ms=5000ms
* 那也就意味着
* 当我们觉得小任务运行间隔时间需要客制化时候 并且明显不需要像serverCron大任务一样频繁的时候 就传递一个大任务运行间隔的整数倍的间隔参数
* @param ms 希望小任务多久执行一次 单位ms
*/
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

Redis-0x1c-serverCron任务
https://bannirui.github.io/2023/04/14/Redis-0x1c-serverCron任务/
作者
dingrui
发布于
2023年4月14日
许可协议