ML functions
 
Loading...
Searching...
No Matches
SequencePooling.h
Go to the documentation of this file.
1
15
16#pragma once
17
18#include <Eigen/Dense>
19#include <cmath>
20#include <cstring>
21#include <iostream>
22#include <stdexcept>
23#include "BaseFunction.h"
24#include "velox/exec/tests/utils/AssertQueryBuilder.h"
25#include "velox/exec/tests/utils/PlanBuilder.h"
26#include "velox/exec/tests/utils/TempDirectoryPath.h"
27#include "velox/vector/tests/utils/VectorTestBase.h"
28
29using namespace facebook::velox;
30using namespace facebook::velox::test;
31using namespace facebook::velox::exec::test;
32using namespace facebook::velox::memory;
33
39 public:
45 SequencePooling(std::string mode, int embeddingDims) {
46 transform(mode.begin(), mode.end(), mode.begin(), ::toupper);
47
48 if (mode != "MIN" && mode != "MAX" && mode != "MEAN") {
49 throw std::runtime_error(
50 "[Error]: The input mode: " + mode +
51 " is not supported. Supported mode: MIN, MAX, MEAN");
52 }
53 mode_ = mode;
54 embeddingDims_ = embeddingDims;
55 dims.push_back(embeddingDims);
56 }
57
66 void apply(
67 const SelectivityVector& rows,
68 std::vector<VectorPtr>& args,
69 const TypePtr& type,
70 exec::EvalCtx& context,
71 VectorPtr& output) const override {
72 BaseVector::ensureWritable(rows, type, context.pool(), output);
73 output->clearNulls(rows);
74 auto arrayOutput = output->as<ArrayVector>();
75 auto sizes = arrayOutput->mutableSizes(rows.end());
76 auto rawSizes = sizes->asMutable<int32_t>();
77 auto offsets = arrayOutput->mutableOffsets(rows.end());
78 auto rawOffsets = offsets->asMutable<int32_t>();
79
80 // Initialize sizes and offsets to zero.
81 std::fill(rawSizes, rawSizes + rows.end(), 0);
82 std::fill(rawOffsets, rawOffsets + rows.end(), 0);
83
84 auto elementsOutput = arrayOutput->elements();
85 auto elementsPool = context.pool();
86
87 exec::DecodedArgs decodedArgs(rows, args, context);
88 auto decodedInput = decodedArgs.at(0);
89 auto numRows = rows.size();
90
91 auto inputArray = decodedInput->base()->as<ArrayVector>();
92 auto inputElements = inputArray->elements();
93 float* inputValues = inputElements->values()->asMutable<float>();
94 auto inputOffsets = inputArray->rawOffsets();
95 auto inputSizes = inputArray->rawSizes();
96
97 std::map<vector_size_t, vector_size_t> rowMap;
98 std::unordered_set<vector_size_t> uniqueRawIndexeSet;
99 std::vector<vector_size_t> uniqueRawIndexeVector;
100 vector_size_t numUniqueRows = 0;
101 rows.applyToSelected([&](vector_size_t row) {
102 auto mappedIndexInRowData = decodedInput->index(row);
103 if (uniqueRawIndexeSet.find(mappedIndexInRowData) ==
104 uniqueRawIndexeSet.end()) {
105 // add it
106 rowMap[row] = numUniqueRows;
107 uniqueRawIndexeSet.insert(mappedIndexInRowData);
108 uniqueRawIndexeVector.push_back(mappedIndexInRowData);
109 ++numUniqueRows;
110 } else {
111 // already added
112 rowMap[row] = rowMap[mappedIndexInRowData];
113 }
114 });
115
116 int numResultMatrixRows = numUniqueRows;
117 Eigen::MatrixXf resultMatix(numResultMatrixRows, dims[0]);
118 int rowIndex = 0;
119 for (auto rawIndex : uniqueRawIndexeVector) {
120 int numEmbeddingValues = inputSizes[rawIndex];
121 // int valueOffset = inputOffsets[rawIndex];
122 int numEmbeddingToCombie = numEmbeddingValues / embeddingDims_;
123 if (numEmbeddingToCombie == 1) {
124 Eigen::Map<const Eigen::VectorXf> rowVector(
125 inputValues + inputOffsets[rawIndex], embeddingDims_);
126 resultMatix.row(rowIndex) = rowVector;
127 } else {
128 Eigen::Map<
129 Eigen::
130 Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
131 varaidicEmbedding(
132 inputValues + inputOffsets[rawIndex],
133 numEmbeddingToCombie,
134 embeddingDims_);
135
136 Eigen::VectorXf mergedValues;
137 if (mode_ == "MIN") {
138 mergedValues = varaidicEmbedding.colwise().minCoeff();
139 } else if (mode_ == "MAX") {
140 mergedValues = varaidicEmbedding.colwise().maxCoeff();
141 } else if (mode_ == "MEAN") {
142 mergedValues = varaidicEmbedding.colwise().mean();
143 }
144 resultMatix.row(rowIndex) = mergedValues;
145 }
146 rowIndex++;
147 }
148
149 auto baseOffset = elementsOutput->size();
150 elementsOutput->resize(baseOffset + rows.end() * dims[0]);
151 float* outputValues = elementsOutput->values()->asMutable<float>();
152
153 vector_size_t outputOffset = 0;
154
155 rows.applyToSelected([&](vector_size_t row) {
156 if (rowMap.find(row) == rowMap.end()) {
157 throw std::runtime_error(
158 "Mapped index not found for the result matrix.");
159 }
160 auto mappedIndexInResultMatrix = rowMap[row];
161 rawOffsets[row] = outputOffset;
162 rawSizes[row] = dims[0];
163
164 std::memcpy(
165 outputValues + outputOffset,
166 resultMatix.row(mappedIndexInResultMatrix).data(),
167 dims[0] * sizeof(float));
168
169 outputOffset += dims[0];
170 });
171 arrayOutput->setElements(elementsOutput);
172 }
173
178 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
179 return {exec::FunctionSignatureBuilder()
180 .argumentType("array(REAL)")
181 .returnType("array(REAL)")
182 .build()};
183 }
184
189 static std::string getName() {
190 return "sequence_pooling";
191 };
192
197 float* getTensor() const override {
198 // TODO: need to implement
199 return nullptr;
200 }
201
207 CostEstimate getCost(std::vector<int> inputDims) {
208 // TODO: need to implement
209 return CostEstimate(0, inputDims[0], inputDims[1]);
210 }
211
212 private:
213 std::string mode_;
214 int embeddingDims_;
215};
A base class for machine learning functions, inheriting from Velox's VectorFunction.
Definition BaseFunction.h:9
std::vector< int > dims
Dimensions of the function.
Definition BaseFunction.h:61
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the sequence pooling function to the input data.
Definition SequencePooling.h:66
static std::string getName()
Returns the name of the function.
Definition SequencePooling.h:189
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition SequencePooling.h:178
SequencePooling(std::string mode, int embeddingDims)
Constructor for SequencePooling.
Definition SequencePooling.h:45
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition SequencePooling.h:207
float * getTensor() const override
Returns the tensor associated with the function.
Definition SequencePooling.h:197