ML functions
 
Loading...
Searching...
No Matches
FraudDetectionFunctions.h
Go to the documentation of this file.
1
15
16#pragma once
17
18#include <time.h>
19#include <Eigen/Dense>
20#include <chrono>
21#include <cmath>
22#include <ctime>
23#include <iomanip>
24#include <iostream>
25#include <locale>
26#include "velox/exec/tests/utils/AssertQueryBuilder.h"
27#include "velox/exec/tests/utils/PlanBuilder.h"
28#include "velox/exec/tests/utils/TempDirectoryPath.h"
29#include "BaseFunction.h"
30#include "velox/vector/tests/utils/VectorTestBase.h"
31
32using namespace facebook::velox;
33using namespace facebook::velox::test;
34using namespace facebook::velox::exec::test;
35using namespace facebook::velox::memory;
36
41class IsWeekday : public MLFunction {
42 public:
51 void apply(
52 const SelectivityVector& rows,
53 std::vector<VectorPtr>& args,
54 const TypePtr& type,
55 exec::EvalCtx& context,
56 VectorPtr& output) const override {
57 BaseVector::ensureWritable(rows, type, context.pool(), output);
58
59 std::vector<int> results;
60
61 BaseVector* baseVec = args[0].get();
62 exec::LocalDecodedVector vecHolder(context, *baseVec, rows);
63 auto decodedArray = vecHolder.get();
64 auto inputTimes = decodedArray->base()->as<FlatVector<int64_t>>();
65
66 const int secondsInADay = 86400;
67 for (int i = 0; i < rows.size(); i++) {
68 int64_t timestamp = inputTimes->valueAt(i);
69
70 std::time_t time = static_cast<std::time_t>(timestamp);
71 std::tm* time_info = std::localtime(&time);
72 int dayOfWeek = time_info->tm_wday;
73
74 // Return true if the day is Saturday (6) or Sunday (0)
75 if (dayOfWeek == 0 || dayOfWeek == 6) {
76 results.push_back(0);
77 } else {
78 results.push_back(1);
79 }
80 }
81
82 VectorMaker maker{context.pool()};
83 auto localResult = maker.flatVector<int>(results);
84 context.moveOrCopyResult(localResult, rows, output);
85 output = maker.flatVector<int>(results, INTEGER());
86 }
87
92 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
93 return {exec::FunctionSignatureBuilder()
94 .argumentType("BIGINT")
95 .returnType("INTEGER")
96 .build()};
97 }
98
103 static std::string getName() {
104 return "is_weekday";
105 }
106
111 float* getTensor() const override {
112 // TODO: need to implement
113 return nullptr;
114 }
115
121 CostEstimate getCost(std::vector<int> inputDims) {
122 // TODO: need to implement
123 return CostEstimate(0, inputDims[0], inputDims[1]);
124 }
125};
126
131class GetAge : public MLFunction {
132 public:
141 void apply(
142 const SelectivityVector& rows,
143 std::vector<VectorPtr>& args,
144 const TypePtr& type,
145 exec::EvalCtx& context,
146 VectorPtr& output) const override {
147 BaseVector::ensureWritable(rows, type, context.pool(), output);
148
149 std::vector<int> results;
150
151 BaseVector* baseVec = args[0].get();
152 exec::LocalDecodedVector vecHolder(context, *baseVec, rows);
153 auto decodedArray = vecHolder.get();
154 auto birthYears = decodedArray->base()->as<FlatVector<int>>();
155
156 auto now = std::chrono::system_clock::now();
157 std::time_t currentTime = std::chrono::system_clock::to_time_t(now);
158 std::tm* localTime = std::localtime(&currentTime);
159 int currentYear = 1900 + localTime->tm_year;
160
161 for (int i = 0; i < rows.size(); i++) {
162 int birthYear = birthYears->valueAt(i);
163 results.push_back(currentYear - birthYear);
164 }
165
166 VectorMaker maker{context.pool()};
167 output = maker.flatVector<int>(results);
168 }
169
174 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
175 return {exec::FunctionSignatureBuilder()
176 .argumentType("INTEGER")
177 .returnType("INTEGER")
178 .build()};
179 }
180
185 static std::string getName() {
186 return "get_age";
187 }
188
193 float* getTensor() const override {
194 // TODO: need to implement
195 return nullptr;
196 }
197
203 CostEstimate getCost(std::vector<int> inputDims) {
204 // TODO: need to implement
205 return CostEstimate(0, inputDims[0], inputDims[1]);
206 }
207};
208
214 public:
223 void apply(
224 const SelectivityVector& rows,
225 std::vector<VectorPtr>& args,
226 const TypePtr& type,
227 exec::EvalCtx& context,
228 VectorPtr& output) const override {
229 BaseVector::ensureWritable(rows, type, context.pool(), output);
230
231 int secondsInADay = 86400;
232 std::vector<std::vector<float>> results;
233
234 BaseVector* base0 = args[0].get();
235 BaseVector* base1 = args[1].get();
236 BaseVector* base2 = args[2].get();
237 BaseVector* base3 = args[3].get();
238
239 exec::LocalDecodedVector firstHolder(context, *base0, rows);
240 auto decodedArray0 = firstHolder.get();
241
242 exec::LocalDecodedVector secondHolder(context, *base1, rows);
243 auto decodedArray1 = secondHolder.get();
244
245 exec::LocalDecodedVector thirdHolder(context, *base2, rows);
246 auto decodedArray2 = thirdHolder.get();
247
248 exec::LocalDecodedVector fourthHolder(context, *base3, rows);
249 auto decodedArray3 = fourthHolder.get();
250
251 for (int i = 0; i < rows.size(); i++) {
252 float totalOrder = (static_cast<float>(decodedArray0->valueAt<int64_t>(i))) / 79.0;
253 float tAmount = (decodedArray1->valueAt<float>(i)) / 16048.0;
254 float timeDiff = (static_cast<float>(decodedArray2->valueAt<int64_t>(i))) / 729.0;
255 int64_t tTimestamp = decodedArray3->valueAt<int64_t>(i);
256
257 // Calculate day of week
258 std::time_t time = static_cast<std::time_t>(tTimestamp);
259 std::tm* time_info = std::localtime(&time);
260 float dayOfWeek = (static_cast<float>(time_info->tm_wday)) / 6.0;
261
262 // Calculate the number of days since Unix epoch
263 float daysSinceEpoch =
264 (static_cast<float>(tTimestamp / secondsInADay)) / 15338.0;
265
266 std::vector<float> vec;
267 vec.push_back(totalOrder);
268 vec.push_back(tAmount);
269 vec.push_back(timeDiff);
270 vec.push_back(dayOfWeek);
271 vec.push_back(daysSinceEpoch);
272
273 results.push_back(vec);
274 }
275
276 VectorMaker maker{context.pool()};
277 output = maker.arrayVector<float>(results, REAL());
278 }
279
284 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
285 return {exec::FunctionSignatureBuilder()
286 .argumentType("BIGINT")
287 .argumentType("REAL")
288 .argumentType("BIGINT")
289 .argumentType("BIGINT")
290 .returnType("ARRAY(REAL)")
291 .build()};
292 }
293
298 static std::string getName() {
299 return "get_transaction_features";
300 }
301
306 float* getTensor() const override {
307 // TODO: need to implement
308 return nullptr;
309 }
310
316 CostEstimate getCost(std::vector<int> inputDims) {
317 // TODO: need to implement
318 return CostEstimate(0, inputDims[0], inputDims[1]);
319 }
320};
321
327 public:
336 void apply(
337 const SelectivityVector& rows,
338 std::vector<VectorPtr>& args,
339 const TypePtr& type,
340 exec::EvalCtx& context,
341 VectorPtr& output) const override {
342 BaseVector::ensureWritable(rows, type, context.pool(), output);
343
344 int secondsInADay = 86400;
345 std::vector<std::vector<float>> results;
346
347 BaseVector* base0 = args[0].get();
348 BaseVector* base1 = args[1].get();
349 BaseVector* base2 = args[2].get();
350 BaseVector* base3 = args[3].get();
351
352 exec::LocalDecodedVector firstHolder(context, *base0, rows);
353 auto decodedArray0 = firstHolder.get();
354
355 exec::LocalDecodedVector secondHolder(context, *base1, rows);
356 auto decodedArray1 = secondHolder.get();
357
358 exec::LocalDecodedVector thirdHolder(context, *base2, rows);
359 auto decodedArray2 = thirdHolder.get();
360
361 exec::LocalDecodedVector fourthHolder(context, *base3, rows);
362 auto decodedArray3 = fourthHolder.get();
363
364 for (int i = 0; i < rows.size(); i++) {
365 float cAddressNum =
366 (static_cast<float>(decodedArray0->valueAt<int>(i))) / 35352.0;
367 float cCustFlag = static_cast<float>(decodedArray1->valueAt<int>(i));
368 float cBirthCountry =
369 (static_cast<float>(decodedArray2->valueAt<int>(i))) / 211.0;
370 float cAge = (static_cast<float>(decodedArray3->valueAt<int>(i))) / 94.0;
371
372 std::vector<float> vec;
373 vec.push_back(cAddressNum);
374 vec.push_back(cCustFlag);
375 vec.push_back(cBirthCountry);
376 vec.push_back(cAge);
377
378 results.push_back(vec);
379 }
380
381 VectorMaker maker{context.pool()};
382 output = maker.arrayVector<float>(results, REAL());
383 }
384
389 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
390 return {exec::FunctionSignatureBuilder()
391 .argumentType("INTEGER")
392 .argumentType("INTEGER")
393 .argumentType("INTEGER")
394 .argumentType("INTEGER")
395 .returnType("ARRAY(REAL)")
396 .build()};
397 }
398
403 static std::string getName() {
404 return "get_customer_features";
405 }
406
411 float* getTensor() const override {
412 // TODO: need to implement
413 return nullptr;
414 }
415
421 CostEstimate getCost(std::vector<int> inputDims) {
422 // TODO: need to implement
423 return CostEstimate(0, inputDims[0], inputDims[1]);
424 }
425};
426
432 public:
441 void apply(
442 const SelectivityVector& rows,
443 std::vector<VectorPtr>& args,
444 const TypePtr& type,
445 exec::EvalCtx& context,
446 VectorPtr& output) const override {
447 BaseVector::ensureWritable(rows, type, context.pool(), output);
448
449 BaseVector* left = args[0].get();
450 BaseVector* right = args[1].get();
451
452 exec::LocalDecodedVector decodedInput1(context, *args[0], rows);
453 exec::LocalDecodedVector decodedInput2(context, *args[1], rows);
454
455 std::vector<int64_t> results;
456 int secondsInADay = 86400;
457
458 for (int i = 0; i < rows.size(); i++) {
459 if (!rows.isValid(i)) {
460 continue;
461 }
462 int64_t timestamp1 = decodedInput1->valueAt<int64_t>(i);
463 int64_t timestamp2 = decodedInput2->valueAt<int64_t>(i);
464
465 int64_t differenceInSeconds = std::abs(timestamp1 - timestamp2);
466 int64_t differenceInDays = differenceInSeconds / secondsInADay;
467 results.push_back(differenceInDays);
468 }
469
470 VectorMaker maker{context.pool()};
471 auto localResult = maker.flatVector<int64_t>(results);
472 context.moveOrCopyResult(localResult, rows, output);
473 }
474
479 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
480 return {exec::FunctionSignatureBuilder()
481 .argumentType("BIGINT")
482 .argumentType("BIGINT")
483 .returnType("BIGINT")
484 .build()};
485 }
486
491 static std::string getName() {
492 return "time_diff_in_days";
493 }
494
499 float* getTensor() const override {
500 // TODO: need to implement
501 return nullptr;
502 }
503
509 CostEstimate getCost(std::vector<int> inputDims) {
510 // TODO: need to implement
511 return CostEstimate(0, inputDims[0], inputDims[1]);
512 }
513};
514
520 public:
525 DateToTimestamp(const char* dateFormat_) {
526 dateFormat = dateFormat_;
527 }
528
537 void apply(
538 const SelectivityVector& rows,
539 std::vector<VectorPtr>& args,
540 const TypePtr& type,
541 exec::EvalCtx& context,
542 VectorPtr& output) const override {
543 BaseVector::ensureWritable(rows, type, context.pool(), output);
544
545 exec::LocalDecodedVector decodedStringHolder(context, *args[0], rows);
546 auto decodedStringInput = decodedStringHolder.get();
547
548 std::vector<int64_t> results;
549 struct std::tm t = {};
550
551 for (int i = 0; i < rows.size(); i++) {
552 StringView val = decodedStringInput->valueAt<StringView>(i);
553 std::string inputStr = std::string(val);
554
555 std::istringstream ss(inputStr);
556 ss >> std::get_time(&t, dateFormat);
557
558 // Check if parsing was successful
559 if (ss.fail()) {
560 std::cerr << "Failed to parse date string " << inputStr << std::endl;
561 results.push_back(0);
562 continue;
563 }
564
565 // Convert tm struct to time_t (timestamp)
566 time_t tt = mktime(&t);
567 // Cast time_t to int64_t
568 int64_t timestamp = static_cast<int64_t>(tt);
569 results.push_back(timestamp);
570 }
571
572 VectorMaker maker{context.pool()};
573 output = maker.flatVector<int64_t>(results, BIGINT());
574 }
575
580 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
581 return {exec::FunctionSignatureBuilder()
582 .argumentType("VARCHAR")
583 .returnType("BIGINT")
584 .build()};
585 }
586
591 static std::string getName() {
592 return "date_to_timestamp";
593 }
594
599 float* getTensor() const override {
600 // TODO: need to implement
601 return nullptr;
602 }
603
609 CostEstimate getCost(std::vector<int> inputDims) {
610 // TODO: need to implement
611 return CostEstimate(0, inputDims[0], inputDims[1]);
612 }
613
614 private:
615 const char* dateFormat;
616};
617
623 public:
632 void apply(
633 const SelectivityVector& rows,
634 std::vector<VectorPtr>& args,
635 const TypePtr& type,
636 exec::EvalCtx& context,
637 VectorPtr& output) const override {
638 BaseVector::ensureWritable(rows, type, context.pool(), output);
639
640 std::vector<int> results;
641
642 BaseVector* baseVec = args[0].get();
643 exec::LocalDecodedVector vecHolder(context, *baseVec, rows);
644 auto decodedArray = vecHolder.get();
645 auto inputProbs = decodedArray->base()->as<ArrayVector>();
646 auto inputProbsValues = inputProbs->elements()->asFlatVector<float>();
647
648 for (int i = 0; i < rows.size(); i++) {
649 int32_t offset = inputProbs->offsetAt(i);
650 float prob_0 = inputProbsValues->valueAt(offset);
651 float prob_1 = inputProbsValues->valueAt(offset + 1);
652 if (std::isnan(prob_0) || std::isnan(prob_1)) {
653 results.push_back(0);
654 } else {
655 int predicted_class = (prob_0 > prob_1) ? 0 : 1;
656 results.push_back(predicted_class);
657 }
658 }
659
660 VectorMaker maker{context.pool()};
661 output = maker.flatVector<int>(results);
662 }
663
668 static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
669 return {exec::FunctionSignatureBuilder()
670 .argumentType("ARRAY(REAL)")
671 .returnType("INTEGER")
672 .build()};
673 }
674
679 static std::string getName() {
680 return "get_binary_class";
681 }
682
687 float* getTensor() const override {
688 // TODO: need to implement
689 return nullptr;
690 }
691
697 CostEstimate getCost(std::vector<int> inputDims) {
698 // TODO: need to implement
699 return CostEstimate(0, inputDims[0], inputDims[1]);
700 }
701};
702
707std::unordered_map<std::string, int> getCountryMap() {
708 std::unordered_map<std::string, int> countryMap;
709
710 // Open the txt file
711 std::string filePath = "/home/velox/resources/data/country_mapping.txt";
712 std::ifstream file(filePath.c_str());
713 if (!file.is_open()) {
714 std::cerr << "Error: Could not open the file!" << std::endl;
715 exit(1);
716 }
717
718 std::string line;
719 // Read the file line by line
720 while (std::getline(file, line)) {
721 std::stringstream ss(line);
722 std::string key;
723 std::string value_str;
724
725 // Get the key before the comma
726 std::getline(ss, key, ',');
727
728 // Get the value after the comma
729 std::getline(ss, value_str);
730
731 // Convert the string value to an integer
732 int value = std::stoi(value_str);
733
734 // Insert into the unordered_map
735 countryMap[key] = value;
736 }
737
738 // Close the file
739 file.close();
740
741 return countryMap;
742}
std::unordered_map< std::string, int > getCountryMap()
Reads a file containing country mappings and returns an unordered_map.
Definition FraudDetectionFunctions.h:707
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:580
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to convert a date string to a timestamp.
Definition FraudDetectionFunctions.h:537
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:609
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:591
DateToTimestamp(const char *dateFormat_)
Constructor for DateToTimestamp.
Definition FraudDetectionFunctions.h:525
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:599
Implements a function to calculate the age based on the birth year.
Definition FraudDetectionFunctions.h:131
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:193
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:185
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:174
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:203
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to calculate the age.
Definition FraudDetectionFunctions.h:141
Implements a function to determine the binary class based on probabilities.
Definition FraudDetectionFunctions.h:622
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to determine the binary class.
Definition FraudDetectionFunctions.h:632
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:668
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:687
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:679
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:697
Implements a function to extract features from customer data.
Definition FraudDetectionFunctions.h:326
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:421
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:403
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:411
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:389
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to extract customer features.
Definition FraudDetectionFunctions.h:336
Implements a function to extract features from transaction data.
Definition FraudDetectionFunctions.h:213
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:284
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:306
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:298
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to extract transaction features.
Definition FraudDetectionFunctions.h:223
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:316
Implements a function to check if a given timestamp corresponds to a weekday.
Definition FraudDetectionFunctions.h:41
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:103
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:111
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:92
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:121
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to check if the timestamp is a weekday.
Definition FraudDetectionFunctions.h:51
A base class for machine learning functions, inheriting from Velox's VectorFunction.
Definition BaseFunction.h:9
Implements a function to calculate the difference in days between two timestamps.
Definition FraudDetectionFunctions.h:431
static std::vector< std::shared_ptr< exec::FunctionSignature > > signatures()
Returns the function signatures.
Definition FraudDetectionFunctions.h:479
float * getTensor() const override
Returns the tensor associated with the function.
Definition FraudDetectionFunctions.h:499
static std::string getName()
Returns the name of the function.
Definition FraudDetectionFunctions.h:491
void apply(const SelectivityVector &rows, std::vector< VectorPtr > &args, const TypePtr &type, exec::EvalCtx &context, VectorPtr &output) const override
Applies the function to calculate the time difference in days.
Definition FraudDetectionFunctions.h:441
CostEstimate getCost(std::vector< int > inputDims)
Estimates the cost of the function.
Definition FraudDetectionFunctions.h:509