如何动态设置Solr Payload

如何动态设置Solr Payload

问题背景

Solr 从6.x开始提供了index payload的功能,(payload是指可以指定为分词之后的每个term指定一个二进制的信息,用于针对每个term作出一定的行为,例如可以用来存储指定的score用于排序,也可以用来存储指定的数据,用于检查term的一定的检查),简单的查阅官方文档之后(这里使用solr7.7.2作为样例),得知我们可以动态的设定每个term的payload(参考SolrPayload),也可以根据指定的token type设置payload数值(参考Numeric Payload Token Filter),但是上述两种方式都无法根据输入的变化而动态的改变payload的值。

具体需求

由于和具体的的业务有关,我这里将需求简单化。

  • 业务中的字段类型为多值字段
  • 同一篇文档的多值字段中不同的数值需要设置不同的payload
  • 需要更具不同的参数设定不同的payload数值

看到这里老司机肯定说这个需求太简单了,直接使用Numeric Payload Token Filter 就行了,可以将多值字段放到多个字段中,每个字段指定不同的payload的问题也可以解决啊 (只需要指定不同字段类型下面例子中的payload值就行了)

1
2
3
4
<analyzer>
<tokenizer class="solr.WhitespaceTokenizerFactory"/>
<filter class="solr.NumericPayloadTokenFilterFactory" payload="0.75" typeMatch="word"/>
</analyzer>

确实,在多值字段的值的个数较少的时候我也推荐这种做法,因为相对比较简单,而且容易操作。但是如果我有多值字段有20个值,那么这样我的schema里面必须得有20个字段类型,20个字段来对应的多个payload ,那么管理这20个字段就变成了一件蛋疼的事情。
当然这里也有人会说为啥不用solr的DelimitedPayloadTokenFilter(用法可以参考,我们可以手动指定每个term的payload,例如 (分隔符|后面的数字为payload)

1
2
3
id,vals_dpf
1,one|1.0 two|2.0 three|3.0
2,weighted|50.0 weighted|100.0

是的,确实可以,但是前提是你的文档内容比较简单(内容简单的话推荐使用DelimitedPayloadTokenFilter,可以在index代码中根据业务动态控制),如果文档比较复杂,比如就是一段文本,那么相当于你得在index代码中将文本进行分词然后拼接成上述的例子,但是如果拼接的话solr的分词功能就形同摆设了,那么有没有办法既能保持index代码的简单性,也可以使用solr的分词功能来完成上述的需求呢?

解决方案

解决方案其实只有两步

  • 可以动态的设置某个字段的某个值的某个属性
  • 自定Tokenizer或者filter根据某个属性设定对用的payload

(我去,这简直就是正确的废话)
那么既然我们的“解决方案”已经有了,我们该怎么一步步的实现这个“解决方案呢”,我们来一步步分析下,

  • 我们怎么动态的设置某个字段的某个值的某个属性?

由于在index的代码中,Solr的给我们的API比较简单,直接就是 SolrInputDocument.set(fieldname,fieldvalue),我们能控制的要么就是fieldname,要么就是fieldvalue, 控制fieldname的方案我前面已经介绍过,可以使用Numeric Payload Token Filter 来解决我们的问题,这里我们可能设法在fieldname上做文章了,那么可以选择的方案只能是在fieldvalue上下功夫了,显然,我们只能在fieldvalue的头上加一些特殊的标记来解决问题,当然这些特殊的标记不能和你的字段值的某个或者某些特征发生冲突,例如我们可以在字段值的前段加上”[|xxx|]”,那么我们就可以在更具xxx的内容设定某个字段的某个属性就行了

  • 如何在solr端读取xxx的内容?

solr的index功能是基于Lucene提供的,那么Lucene给我的api也比较简单

1
2
Document doc = new Document();
doc.add(new StringField("fieldName","document context", Field.Store.YES));

也就是所有的文档的字段到lucene这一层肯定都是转成一个document,然后向文档里面添加对应的字段,不同的字段使用不同的分词器来进行分词,对,我们可以自定义字段,在生成这个字段的时候解析xxx的内容,并且根据xxx的内容设定某个属性的值问题不就能解决了吗?
简单的翻看了Solr的某个字段类型的源码(这里以Solr.TextField为例子)
Solr.TextField Method List ,都是一些get方法,貌似没啥用,看看父类org.apache.solr.schema.FieldType
FieldType Method List
貌似createField就是我们要找的方法,我们可以在这里解析fieldvalue ,并且根据xxx设置这个字段的某个属性,然后拿掉xxx,还原原始的fieldvalue

  • “某个属性”到底是哪个属性

在上文中我们一直提到“某个属性”,但是我们到目前为止还是没有找到这个可以设置的属性到底是啥,但是如果我们不找到这个属性,上面的分析过程全是扯淡的,为了找到这个属性,我们必须知道这个属性满足什么样的特征呢,我们分析下这个属性需要满足的特征

  • 这个属性不能对原始的fieldvalue产生变更

为什么?按照我们的想法,这个”某个属性“不能对原始的fieldvalue参数影响,如果改变了原始value,

  • 这个属性可以必须是跟着term走的,并且可以在tokenizer和filter中可以获取

为什么?按照我们的做法,我们必须在某个tokenizer或者filter或者这个属性,才能设置payload的值,其他的地方没发接触到payload

明确了上述两个特征,我们找个filter/tokenizer的源代码看下 ,找个最简单的LowerCaseFilter 看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LowerCaseFilter extends TokenFilter {
private final CharTermAttribute termAtt = addAttribute(CharTermAttribute.class);

/**
* Create a new LowerCaseFilter, that normalizes token text to lower case.
*
* @param in TokenStream to filter
*/
public LowerCaseFilter(TokenStream in) {
super(in);
}

@Override
public final boolean incrementToken() throws IOException {
if (input.incrementToken()) {
CharacterUtils.toLowerCase(termAtt.buffer(), 0, termAtt.length());
return true;
} else
return false;
}
}

看到这里我们有了答案,原来CharTermAttribute就是Lucene/Solr原来在设定某个term的值的,那么我们可以自己造一个,放入根据xxx得到的属性,然后在filter里面或者这个属性,不就可以动态的设定payload的值了嘛, 对头, 上代码

  • 自定义Solr的Field
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DynamicPayloadsField extends TextField {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public IndexableField createField(SchemaField field, Object value) {
if (!field.indexed() && !field.stored()) {
if (log.isTraceEnabled())
log.trace("Ignoring unindexed/unstored field: " + field);
return null;
}

String val;
try {
val = toInternal(value.toString());
} catch (RuntimeException e) {
throw new SolrException( SolrException.ErrorCode.SERVER_ERROR, "Error while creating field '" + field + "' from value '" + value + "'", e);
}
if (val==null) return null;
String target = null;
if (val.startsWith("[|") && val.contains("|]")) {
target = val.substring(2,val.indexOf("|]"));
if (DynamicPayloadAttribute.FIELD_MAPPINGS.containsKey(target)) {
String parsedData = val.substring(val.indexOf("|]")+2);
TokenizerChain tokenizerChain = (TokenizerChain)field.getType().getIndexAnalyzer();
Tokenizer tk = tokenizerChain.getTokenizerFactory().create(TokenStream.DEFAULT_TOKEN_ATTRIBUTE_FACTORY);
TokenStream ts = tk;
for (TokenFilterFactory filter : tokenizerChain.getTokenFilterFactories()) {
ts = filter.create(ts);
}
Analyzer.TokenStreamComponents components = new Analyzer.TokenStreamComponents(tk, ts);
Reader stringReader = tokenizerChain.initReader(field.getName(),new StringReader(parsedData));
tk.setReader(stringReader);
TokenStream stream = components.getTokenStream();
stream.getAttribute(DynamicPayloadAttribute.class).setPayloadType(target);
return new Field(field.getName(), stream, field);
} else {
return super.createField(field.getName(), val, field);
}
} else {
return super.createField(field.getName(), val, field);
}
}
}
  • 自定义Attribute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface DynamicPayloadAttribute extends Attribute {
Map<String,Integer> FIELD_MAPPINGS = new HashMap<String,Integer>(){
{
put("A",0);
put("B",1);
put("C",2);
put("D",3);
}
};
String DEFAULT_PAYLOAD_TYPE = "UNKNOW_PAYLOAD_TYPE";
String payloadType();
void setPayloadType(String type);
byte getPayLoad();
}
  • 自定义filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DynamicPayloadTokenFilter extends TokenFilter {
private final PayloadAttribute payAtt = addAttribute(PayloadAttribute.class);
private final DynamicPayloadAttribute dypayAtt = addAttribute(DynamicPayloadAttribute.class);

protected DynamicPayloadTokenFilter(TokenStream input) {
super(input);
}

@Override
public boolean incrementToken() throws IOException {
if (input.incrementToken()) {
byte payload = dypayAtt.getPayLoad();
if(payload==Byte.MAX_VALUE){
payAtt.setPayload(null);
}else {
byte array [] = {payload};
payAtt.setPayload(new BytesRef(array));
}
return true;
} else return false;
}
}

这里我只放上了重要的代码片段,完整的代码请参考apache-solr-plugins/dynamic-payload

爬坑指南

当然如果所有的细节都如上述描述那么简单,那么各位看官肯定觉得无聊死了,所有的程序员朋友们肯定更加关注这个过程中有没有什么坑,当然坑是存在的,坑是痛苦的,爬坑的过程是痛苦的,爬坑是需要分享的。
遇到的坑:
在Solr的document页面上添加文档是成功的,但是一旦用代码上多线程立马报错(主要的错误就是TokenStream 在调用前没有调用reset方法)
在没有修正这个问题之前的createField的方法实现为(部分)

1
2
3
4
5
6
7
8
if (val.startsWith("[|") && val.contains("|]")) {
target = val.substring(2,val.indexOf("|]"));
if (DynamicPayloadAttribute.FIELD_MAPPINGS.containsKey(target)) {
String parsedData = val.substring(val.indexOf("|]")+2);

TokenStream stream = field.getType().getIndexAnalyzer().tokenStream(field.getName(),parsedData);
stream.addAttributeImpl(new DynamicPayloadAttributeImpl(target));
return new Field(field.getName(),stream,field);

而问题就出现在tokenStream方法上

1
field.getType().getIndexAnalyzer().tokenStream(field.getName(),parsedData);

进入实现可以看到TokenStreamComponents是会重用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final TokenStream tokenStream(String fieldName, String text) {
Analyzer.TokenStreamComponents components = this.reuseStrategy.getReusableComponents(this, fieldName);
ReusableStringReader strReader = components != null && components.reusableStringReader != null?components.reusableStringReader:new ReusableStringReader();
strReader.setValue(text);
Reader r = this.initReader(fieldName, strReader);
if(components == null) {
components = this.createComponents(fieldName);
this.reuseStrategy.setReusableComponents(this, fieldName, components);
}

components.setReader(r);
components.reusableStringReader = strReader;
return components.getTokenStream();
}

这样做的好处就是不用重复生成tokenstream compents,并且不同的只是输入,其他的都是一样的,但是为啥在我们的应用场景就会报错呢,原来错误不再我们的使用方式上,而是在solr的后续的流程中会调用Field.tokenStream方法,并且在tokenStream属性不为空的情况下,就会直接return,所以想象下,如果同一个字段的不同值公用了同一个tokenstream属性,那么针对这个tokenstream设定的不同值,只会在最后一个调用的属性会成功。(显然这和我们的期望相违背)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public TokenStream tokenStream(Analyzer analyzer, TokenStream reuse) {
if(this.fieldType().indexOptions() == IndexOptions.NONE) {
return null;
} else if(!this.fieldType().tokenized()) {
if(this.stringValue() != null) {
if(!(reuse instanceof Field.StringTokenStream)) {
reuse = new Field.StringTokenStream();
}

((Field.StringTokenStream)reuse).setValue(this.stringValue());
return (TokenStream)reuse;
} else if(this.binaryValue() != null) {
if(!(reuse instanceof Field.BinaryTokenStream)) {
reuse = new Field.BinaryTokenStream();
}

((Field.BinaryTokenStream)reuse).setValue(this.binaryValue());
return (TokenStream)reuse;
} else {
throw new IllegalArgumentException("Non-Tokenized Fields must have a String value");
}
} else if(this.tokenStream != null) {
return this.tokenStream;
} else if(this.readerValue() != null) {
return analyzer.tokenStream(this.name(), this.readerValue());
} else if(this.stringValue() != null) {
return analyzer.tokenStream(this.name(), this.stringValue());
} else {
throw new IllegalArgumentException("Field must have either TokenStream, String, Reader or Number value; got " + this);
}
}

但是即使这样,也不应该会报错,原来如果在create field字段中调用tokenstream方法,那么针对多值字段,那么同一个tokenstream对象就有可能可能共享,那么当Solr处理完第一个值结束的时候会将tokenstream进行重制,那么原有的reader就会失效,那么当Solr开始处理第二个值的时候由于共享的同一个tokenstream,那么对用的reader已经失效,就会抛出错误。所以我们需要每次生成不同的tokenstream就可以解决问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (val.startsWith("[|") && val.contains("|]")) {
target = val.substring(2,val.indexOf("|]"));
if (DynamicPayloadAttribute.FIELD_MAPPINGS.containsKey(target)) {
String parsedData = val.substring(val.indexOf("|]")+2);
TokenizerChain tokenizerChain = (TokenizerChain)field.getType().getIndexAnalyzer();
Tokenizer tk = tokenizerChain.getTokenizerFactory().create(TokenStream.DEFAULT_TOKEN_ATTRIBUTE_FACTORY);
TokenStream ts = tk;
for (TokenFilterFactory filter : tokenizerChain.getTokenFilterFactories()) {
ts = filter.create(ts);
}
Analyzer.TokenStreamComponents components = new Analyzer.TokenStreamComponents(tk, ts);
Reader stringReader = tokenizerChain.initReader(field.getName(),new StringReader(parsedData));
tk.setReader(stringReader);
TokenStream stream = components.getTokenStream();
stream.getAttribute(DynamicPayloadAttribute.class).setPayloadType(target);
return new Field(field.getName(), stream, field);

参考的项目或者文章

SolrPayload 功能 Example
Solr Numeric Payload Token Filter