elasticsearch的mapping
以往采集日志都是比较简单的操作,没有过多深入es的mapping等内容,不过有时候技能都是基于需求驱动的。
现有日志内容如下:
{"sign":"test-log","@timestamp":"2020-07-05T17:43:12+08:00","type":"filter","sale_id":2084090132,"sale_uri":"2003261352dvxv50","shop_id":47516579,"shop_uri":"1910201845lawpvt","cat_id":4,"sec_cat_id":4001,"rule":"startprice","description":"拍品起拍价\u003e0","score":0,"arguments":"{\"startPrice\":2600}"}
1
因为后期会对日志中一些内容进行聚合计算,因此要求日志中score字段写入之后是float类型,但是如果什么都不指定,那么默认写入之后,会分配一个其他的类型。
两种解决方式。
# 第一:创建索引的时候指定mapping
PUT test-index
{
"mappings" : {
"properties" : {
"score" : {
"type" : "float"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
返回结果:
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "test-indexa"
}
1
2
3
4
5
2
3
4
5
查看索引mapping:
GET test-index/_mapping
1
返回结果
{
"test-index" : {
"mappings" : {
"properties" : {
"score" : {
"type" : "float"
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
这样写进来之后对应的 score字段就是float类型了。
但是这样有一个问题,因为刚刚是指定了单个索引的mapping,正常情况下,我们的日志索引都会按天来存,那么新的索引就无法自动进行对照了。接下来要引入索引模板的配置定义。
PUT _template/template_test
{
"index_patterns": ["test*"],
"order" : 1,
"settings" : {
"number_of_shards": 1,
"number_of_replicas" : 2
},
"mappings" : {
"properties" : {
"score" : {
"type" : "float"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
创建一个索引模板,只要是以test开头的索引,那么创建索引并写入进来之后,对应的score字段就应该是float类型了。
GET test-index-2020-03-30/_mapping
{
"test-index-2020-03-30" : {
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"arguments" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"batch" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"cat_id" : {
"type" : "long"
},
"description" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"host" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"path" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"rule" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sale_id" : {
"type" : "long"
},
"sale_uri" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"score" : {
"type" : "float"
},
"sec_cat_id" : {
"type" : "long"
},
"shop_id" : {
"type" : "long"
},
"shop_uri" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sign" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"type" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# 2,logstash处理。
还有一种相对简便的方案是在lgostash层面来做,让日志在从logstash转过来的时候,指定某些字段的类型,配置如下:
input {
kafka {
bootstrap_servers => "192.168.0.1:9092"
group_id => "test-index"
consumer_threads => 6
topics => ["test-index"]
client_id => "test-index"
codec => "json"
check_crcs => "false"
}
}
filter {
mutate {
convert => {
"score" => "float"
}
}
}
output {
elasticsearch {
hosts => ["http://192.168.0.2:9208"]
index => "test-index-%{+YYYY-MM-dd-HH}"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
实际生产中,也会利用这一功能,对NGINX的access日志进行一些特殊处理:
input {
kafka {
bootstrap_servers => "192.168.0.1:9092"
group_id => "nginx_access"
consumer_threads => 6
topics => "nginx_access"
codec => "json"
}
}
filter {
mutate {
split => ["request_uri" , "?"]
add_field => {
"uri_path" => "%{request_uri[0]}"
"uri_query" => "%{request_uri[1]}"
}
remove_field => ["request_uri"]
convert => {
"response" => "integer"
"body_bytes_sent" => "integer"
"request_time" => "float"
"upstream_response_time" => "float"
}
}
}
output {
elasticsearch {
hosts => ["http://192.168.0.2:9208"]
index => "nginx_access-%{+YYYY.MM.dd}-1"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上次更新: 2024/01/11, 14:21:50