분기없는 K- 평균 (또는 기타 최적화)
참고 : 솔루션 자체보다는 이러한 종류의 솔루션에 접근하고 제안하는 방법에 대한 가이드가 더 감사하겠습니다.
특정 상황에서 최고의 프로파일 링 핫스팟으로 나타나는 내 시스템의 성능에 매우 중요한 기능이 있습니다. k- 평균 반복의 중간에 있습니다 (이미 각 작업자 스레드에서 포인트의 하위 범위를 처리하기 위해 병렬을 사용하여 다중 스레드 됨).
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
if (dist < pt.min_dist) // <-- #1 hotspot
{
pt.min_dist = dist;
pt.min_index = i;
}
}
이 코드 섹션을 처리하는 데 필요한 시간 절약은 상당히 중요하기 때문에 나는 종종 그것을 많이 다루었습니다. 예를 들어 중심 루프를 외부에 배치하고 주어진 중심에 대해 병렬로 점을 반복하는 것이 좋습니다. 여기서 클러스터 포인트의 수는 수백만에 이릅니다. 반면 중심의 수는 수천에 이릅니다. 이 알고리즘은 몇 번의 반복 (종종 10 회 미만)에 적용됩니다. 완벽한 수렴 / 안정성을 추구하지 않고 단지 '합리적인'근사치를 추구합니다.
어떤 아이디어라도 고맙게 생각하지만, 제가 정말로 발견하고 싶은 것은이 코드가 SIMD 버전을 허용하는 것처럼 분기없이 만들 수 있는지 여부입니다. 저는 분기없는 해결책을 찾는 방법을 쉽게 파악할 수있는 정신적 능력을 개발하지 못했습니다. 초기에 재귀에 처음 노출되었을 때와 마찬가지로 제 뇌는 거기에서 실패하므로 분기없는 작성 방법에 대한 가이드 코드와 적절한 사고 방식을 개발하는 방법도 도움이 될 것입니다.
요컨대,이 코드를 마이크로 최적화하는 방법에 대한 가이드와 힌트 및 제안 (솔루션이 아닐 수도 있음)을 찾고 있습니다. 알고리즘 개선의 여지가있을 가능성이 높지만 제 맹점은 항상 마이크로 최적화 솔루션에있었습니다 (그리고 과도하게 사용하지 않고 더 효과적으로 적용하는 방법을 배우고 싶습니다). 논리에 대한 청키 한 병렬로 이미 긴밀하게 다중 스레드되어 있으므로 더 똑똑한 알고리즘없이 시도 할 수있는 더 빠른 작업 중 하나로서 마이크로 최적화 코너로 거의 밀려났습니다. 우리는 메모리 레이아웃을 완전히 자유롭게 변경할 수 있습니다.
알고리즘 제안에 대한 응답
알고리즘 수준에서 명확하게 개선 될 수있는 O (knm) 알고리즘을 미세 최적화하려는 시도에서이 모든 것을 잘못 보는 것에 대해 전적으로 동의합니다. 이것은이 특정 질문을 다소 학문적이고 비실용적 인 영역으로 밀어 붙입니다. 그러나 일화를 허용 할 수 있다면, 저는 높은 수준의 프로그래밍의 원래 배경에서 왔습니다. 광범위하고 대규모의 관점, 안전성에 중점을두고 낮은 수준의 구현 세부 사항에 대해서는 거의 강조하지 않습니다. 저는 최근에 프로젝트를 매우 다른 종류의 현대적인 스타일로 전환했으며 캐시 효율성, GPGPU, 분기없는 기술, SIMD, 실제로 malloc을 능가하는 특수 목적 mem 할당 자의 동료들로부터 모든 종류의 새로운 트릭을 배우고 있습니다. 그러나 특정 시나리오의 경우) 등
최신 성능 트렌드를 따라 잡으려고 노력하고 있습니다. 놀랍게도 90 년대에 자주 선호했던 이전 데이터 구조가 링크 / 트리 유형 구조가 실제로 훨씬 더 순진한 성능보다 훨씬 뛰어나다는 사실을 발견했습니다. 연속적인 메모리 블록에 대해 조정 된 명령을 적용하는 잔인하고 마이크로 최적화 된 병렬 코드. 동시에 우리가 알고리즘을 기계에 더 많이 맞추고 이런 방식으로 가능성을 좁히고 있다고 생각하기 때문에 (특히 GPGPU에서) 다소 실망 스럽습니다.
가장 재미있는 점은 이전에 사용했던 정교한 알고리즘과 데이터 구조보다 이러한 유형의 마이크로 최적화되고 빠른 어레이 처리 코드를 유지 관리하기가 훨씬 쉽다는 것입니다. 처음에는 일반화하기가 더 쉽습니다. 또한 동료들은 특정 지역의 특정 속도 저하에 대한 고객 불만을 종종 받아 들일 수 있으며, 그저 SIMD에 대해 평행을 이룬 다음 적절한 속도로 완료했다고 말할 수 있습니다. 알고리즘 개선은 종종 훨씬 더 많은 것을 제공 할 수 있지만, 이러한 마이크로 최적화를 적용 할 수있는 속도와 비침 입성으로 인해 더 나은 알고리즘에 대한 논문을 읽는 데 시간이 걸릴 수 있습니다 (더 많은 시간이 필요할 수 있음). 광범위한 변경). 그래서 저는 최근에 마이크로 최적화의 악 대차에 뛰어 들었습니다. 아마도이 특정한 경우에는 너무 많은 것 같습니다.
분해
참고 : 저는 조립이 정말 나쁘기 때문에 시행 착오 방식으로 더 많은 것을 조정했습니다. vtune에 표시된 핫스팟이 병목 현상 일 수있는 이유에 대해 다소 교육적인 추측을 내놓은 다음 확인을 시도합니다. 시간이 개선되면 추측에 시간이 개선되면 진실에 대한 힌트가 있다고 가정하고 그렇지 않으면 점수를 완전히 놓친다고 가정합니다.
000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h)
{
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7 cmp ecx,r10d
000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h)
000007FEEE3FB8AC lea rax,[rbx+rbx*2]
000007FEEE3FB8B0 add rax,rax
000007FEEE3FB8B3 lea r8,[rbp+rax*8+8]
{
const ClusterCentroid& cent = centroids[i];
const float x = pt.pos[0] - cent.pos[0];
const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8 movss xmm0,dword ptr [rdx]
const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4]
000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4]
000007FEEE3FB8C6 subss xmm2,dword ptr [r8]
000007FEEE3FB8CB subss xmm0,dword ptr [r8-4]
000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8]
const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7 mulss xmm2,xmm2
000007FEEE3FB8DB mulss xmm0,xmm0
000007FEEE3FB8DF mulss xmm1,xmm1
000007FEEE3FB8E3 addss xmm2,xmm0
000007FEEE3FB8E7 addss xmm2,xmm1
if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8]
000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h)
{
pt.min_dist = dist;
000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2
pt.min_index = i;
000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx
000007FEEE3FB8F9 inc ecx
000007FEEE3FB8FB add r8,30h
000007FEEE3FB8FF cmp ecx,r10d
000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h)
for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904 inc edi
000007FEEE3FB906 add rdx,20h
000007FEEE3FB90A cmp edi,dword ptr [rsi+4]
000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h)
000007FEEE3FB913 mov rbx,qword ptr [irange]
}
}
}
}
우리는 SSE 2를 목표로 삼아야했습니다. 우리 시대에 약간 뒤처졌지만 SSE 4조차도 최소 요구 사항으로 괜찮다고 가정했을 때 사용자 기반이 실제로 한 번 넘어졌습니다 (사용자에게 프로토 타입 Intel 시스템이 있음).
독립형 테스트로 업데이트 : ~ 5.6 초
제공되는 모든 도움에 대해 매우 감사합니다! 코드베이스가 상당히 광범위하고 해당 코드를 트리거하는 조건이 복잡하기 때문에 (여러 스레드에서 트리거되는 시스템 이벤트) 매번 실험적인 변경을 수행하고 프로파일 링하는 것은 다소 다루기 어렵습니다. 그래서 저는 다른 사람들도 실행하고 시험해 볼 수있는 독립형 애플리케이션으로 측면에 표면 테스트를 설정하여이 모든 정중하게 제공되는 솔루션을 실험 할 수 있습니다.
#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
#define ALIGN16 __declspec(align(16))
#else
#include <malloc.h>
#define ALIGN16 __attribute__((aligned(16)))
#endif
using namespace std;
// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
#ifdef _MSC_VER
return _aligned_malloc(amount, 16);
#else
void* mem = 0;
posix_memalign(&mem, 16, amount);
return mem;
#endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}
// Aligned free.
static void free16(void* mem)
{
#ifdef _MSC_VER
return _aligned_free(mem);
#else
free(mem);
#endif
}
// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;
class Points
{
public:
Points(): data(malloc16_t<Point>(num_points))
{
for (int p=0; p < num_points; ++p)
{
const float xyz[3] =
{
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX
};
init(p, xyz);
}
}
~Points()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].centroid = -1;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void associate(int n, int new_centroid)
{
data[n].centroid = new_centroid;
}
int centroid(int n) const
{
return data[n].centroid;
}
float* operator[](int n)
{
return data[n].xyz;
}
private:
Points(const Points&);
Points& operator=(const Points&);
struct Point
{
int centroid;
float xyz[3];
};
Point* data;
};
class Centroids
{
public:
Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
{
// Naive initial selection algorithm, but outside the
// current area of interest.
for (int c=0; c < num_centroids; ++c)
init(c, points[c]);
}
~Centroids()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].count = 0;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void reset(int n)
{
data[n].count = 0;
data[n].xyz[0] = 0.0f;
data[n].xyz[1] = 0.0f;
data[n].xyz[2] = 0.0f;
}
void sum(int n, const float* pt_xyz)
{
data[n].xyz[0] += pt_xyz[0];
data[n].xyz[1] += pt_xyz[1];
data[n].xyz[2] += pt_xyz[2];
++data[n].count;
}
void average(int n)
{
if (data[n].count > 0)
{
const float inv_count = 1.0f / data[n].count;
data[n].xyz[0] *= inv_count;
data[n].xyz[1] *= inv_count;
data[n].xyz[2] *= inv_count;
}
}
float* operator[](int n)
{
return data[n].xyz;
}
int find_nearest(const float* pt_xyz) const
{
float min_dist_squared = numeric_limits<float>::max();
int min_centroid = -1;
for (int c=0; c < num_centroids; ++c)
{
const float* cen_xyz = data[c].xyz;
const float x = pt_xyz[0] - cen_xyz[0];
const float y = pt_xyz[1] - cen_xyz[1];
const float z = pt_xyz[2] - cen_xyz[2];
const float dist_squared = x*x + y*y * z*z;
if (min_dist_squared > dist_squared)
{
min_dist_squared = dist_squared;
min_centroid = c;
}
}
return min_centroid;
}
private:
Centroids(const Centroids&);
Centroids& operator=(const Centroids&);
struct Centroid
{
int count;
float xyz[3];
};
Centroid* data;
};
// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}
static void k_means(Points& points, Centroids& centroids)
{
// Find the closest centroid for each point.
for (int p=0; p < num_points; ++p)
{
const float* pt_xyz = points[p];
points.associate(p, centroids.find_nearest(pt_xyz));
}
// Reset the data of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.reset(c);
// Compute new position sum of each centroid.
for (int p=0; p < num_points; ++p)
centroids.sum(points.centroid(p), points[p]);
// Compute average position of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.average(c);
}
int main()
{
Points points;
Centroids centroids(points);
cout << "Starting simulation..." << endl;
double start_time = sys_time();
for (int i=0; i < num_iterations; ++i)
k_means(points, centroids);
cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
cout << "# Points: " << num_points << endl;
cout << "# Centroids: " << num_centroids << endl;
// Write the centroids to a file to give us some crude verification
// of consistency as we make changes.
ofstream out("centroids.txt");
for (int c=0; c < num_centroids; ++c)
out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
피상적 인 테스트의 위험성을 알고 있지만 이전 실제 세션에서 이미 핫스팟으로 간주 되었기 때문에 변명 할 수 있기를 바랍니다. 또한 이러한 코드를 마이크로 최적화하는 것과 관련된 일반적인 기술에 관심이 있습니다.
나는 이것을 프로파일 링 할 때 약간 다른 결과를 얻었습니다. 시간은 여기 루프 내에서 조금 더 균등하게 분산되어 있으며 이유가 확실하지 않습니다. 아마도 데이터가 더 작기 때문일 것입니다 (멤버를 생략하고 멤버를 끌어 올려 min_dist
로컬 변수로 만들었습니다). 중심점과 점 사이의 정확한 비율도 약간 다르지만 여기서 개선 된 사항을 원래 코드로 변환 할 수있을만큼 충분히 가까워 졌으면합니다. 또한이 피상적 테스트에서는 단일 스레드이며 분해가 상당히 다르게 보이므로 원본없이이 피상적 테스트를 최적화 할 위험이있을 수 있습니다 (내 지식을 확장하는 데 더 관심이 있으므로 당분간 감수 할 위험이 있습니다. 이 정확한 경우에 대한 솔루션보다는 이러한 경우를 최적화 할 수있는 기술의 수).
Yochai Timmer의 제안으로 업데이트-~ 12.5 초
오, 어셈블리를 잘 이해하지 못한 채 마이크로 최적화의 문제에 직면했습니다. 나는 이것을 대체했다 :
-if (min_dist_squared > dist_squared)
-{
- min_dist_squared = dist_squared;
- pt.centroid = c;
-}
이것으로 :
+const bool found_closer = min_dist_squared > dist_squared;
+pt.centroid = bitselect(found_closer, c, pt.centroid);
+min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
.. 시간이 ~ 5.6 초에서 ~ 12.5 초로 확대 된 시간을 찾을 수 있습니다. 그럼에도 불구하고 그것은 그의 잘못이 아니며 그의 솔루션의 가치를 앗아가는 것도 아닙니다. 그것은 기계 수준에서 실제로 무슨 일이 일어나고 있는지 이해하지 못하고 어둠 속에서 찌르는 것입니다. 그것은 분명히 놓친 것 같고, 처음에 생각했던 것처럼 나는 지점의 잘못된 예측의 희생자가 아니었다. 그럼에도 불구하고 그의 제안 된 솔루션은 이러한 경우에 시도 할 수있는 훌륭하고 일반화 된 기능이며, 팁과 트릭의 도구 상자에 추가하게되어 감사합니다. 이제 2 라운드입니다.
Harold의 SIMD 솔루션-2.496 초 (주의 사항 참조)
이 솔루션은 놀랍습니다. 클러스터 담당자를 SoA로 변환 한 후 이것으로 ~ 2.5 초의 시간을 얻었습니다! 불행히도 일종의 결함이있는 것 같습니다. 0의 값을 가진 끝쪽의 일부 중심을 포함하여 약간의 정밀도 차이 이상을 제안하는 최종 출력에 대해 매우 다른 결과를 얻고 있습니다 (검색에서 찾을 수 없음을 의미 함). 나는 디버거로 SIMD 로직을 살펴보고 무슨 일이 일어나고 있는지 확인하려고 노력해 왔습니다. 이것은 단지 내 부분의 전사 오류 일 수 있지만 누군가가 오류를 발견 할 수있는 경우를 대비 한 코드입니다.
결과 속도를 늦추지 않고 오류를 수정할 수 있다면이 속도 향상은 순수한 마이크로 최적화에서 상상했던 것 이상입니다!
// New version of Centroids::find_nearest (from harold's solution):
int find_nearest(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
Harold의 SIMD 솔루션 (수정 됨)-~ 2.5 초
수정 사항을 적용하고 테스트 한 후 결과는 원래 코드베이스와 유사한 개선 사항으로 손상되지 않고 올바르게 작동합니다!
이것이 내가 더 잘 이해하고자했던 지식의 성배에 도달했기 때문에 (분기없는 SIMD), 작업 속도를 두 배로 높이는 것 이상으로 몇 가지 추가 소품으로 솔루션을 제공 할 것입니다. 제 목표는 단순히이 핫스팟을 완화하는 것이 아니라 가능한 해결책에 대한 개인적인 이해를 넓히는 것이었기 때문에 그것을 이해하려고 노력하면서 숙제를 잘라 냈습니다.
그럼에도 불구하고 알고리즘 제안에서 정말 멋진 bitselect
트릭에 이르기까지 여기에 기여한 모든 기여에 감사드립니다 ! 나는 모든 대답을 받아 들일 수 있기를 바랍니다. 언젠가는 모든 것을 시도해 볼 수도 있지만, 지금은 이러한 비 수학적 SIMD 작업 중 일부를 이해하는 데 숙제가 있습니다.
int find_nearest_simd(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
__m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
__m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);
__m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
안타깝게도 SSE4.1을 사용할 수 없지만 SSE2는 사용할 수 있습니다. 나는 이것을 테스트하지 않았고, 구문 오류가 있는지 확인하고 어셈블리가 의미가 있는지 확인하기 위해 컴파일했습니다 (GCC가 사용되지 않은 min_index
일부 xmm
레지스터로 도 유출되지만 그 이유는 확실하지 않습니다)
int find_closest(float *x, float *y, float *z,
float pt_x, float pt_y, float pt_z, int n) {
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i = 4; i < n; i += 4) {
xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
float mdist[4];
_mm_store_ps(mdist, min_dist);
uint32_t mindex[4];
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i = 1; i < 4; i++) {
if (mdist[i] < closest) {
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
평소와 같이 포인터가 16 정렬 될 것으로 예상합니다. 또한 패딩은 무한대 지점이어야합니다 (따라서 대상에 가장 가깝지 않음).
SSE 4.1을 사용하면
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
이로 인해
min_index = _mm_blendv_epi8(min_index, index, mask);
다음은 vsyasm 용으로 만들어진 asm 버전이며 약간 테스트되었습니다 (작동하는 것 같습니다)
bits 64
section .data
align 16
centroid_four:
dd 4, 4, 4, 4
centroid_index:
dd 0, 1, 2, 3
section .text
global find_closest
proc_frame find_closest
;
; arguments:
; ecx: number of points (multiple of 4 and at least 4)
; rdx -> array of 3 pointers to floats (x, y, z) (the points)
; r8 -> array of 3 floats (the reference point)
;
alloc_stack 0x58
save_xmm128 xmm6, 0
save_xmm128 xmm7, 16
save_xmm128 xmm8, 32
save_xmm128 xmm9, 48
[endprolog]
movss xmm0, [r8]
shufps xmm0, xmm0, 0
movss xmm1, [r8 + 4]
shufps xmm1, xmm1, 0
movss xmm2, [r8 + 8]
shufps xmm2, xmm2, 0
; pointers to x, y, z in r8, r9, r10
mov r8, [rdx]
mov r9, [rdx + 8]
mov r10, [rdx + 16]
; reference point is in xmm0, xmm1, xmm2 (x, y, z)
movdqa xmm3, [rel centroid_index] ; min_index
movdqa xmm4, xmm3 ; current index
movdqa xmm9, [rel centroid_four] ; index increment
paddd xmm4, xmm9
; calculate initial min_dist, xmm5
movaps xmm5, [r8]
subps xmm5, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm5, xmm5
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm5, xmm7
addps xmm5, xmm8
add r8, 16
add r9, 16
add r10, 16
sub ecx, 4
jna _tail
_loop:
movaps xmm6, [r8]
subps xmm6, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm6, xmm6
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm6, xmm7
addps xmm6, xmm8
add r8, 16
add r9, 16
add r10, 16
movaps xmm7, xmm6
cmpps xmm6, xmm5, 1
minps xmm5, xmm7
movdqa xmm7, xmm6
pand xmm6, xmm4
pandn xmm7, xmm3
por xmm6, xmm7
movdqa xmm3, xmm6
paddd xmm4, xmm9
sub ecx, 4
ja _loop
_tail:
; calculate horizontal minumum
pshufd xmm0, xmm5, 0xB1
minps xmm0, xmm5
pshufd xmm1, xmm0, 0x4E
minps xmm0, xmm1
; find index of the minimum
cmpps xmm0, xmm5, 0
movmskps eax, xmm0
bsf eax, eax
; index into xmm3, sort of
movaps [rsp + 64], xmm3
mov eax, [rsp + 64 + rax * 4]
movaps xmm9, [rsp + 48]
movaps xmm8, [rsp + 32]
movaps xmm7, [rsp + 16]
movaps xmm6, [rsp]
add rsp, 0x58
ret
endproc_frame
C ++에서 :
extern "C" int find_closest(int n, float** points, float* reference_point);
때때로 bitselect (condition? true : false)라고하는 분기없는 삼항 연산자를 사용할 수 있습니다.
기본적으로 아무것도하지 않는 두 명의 멤버에게만 사용하십시오.
추가 작업에 대해 걱정하지 마십시오. if 문 분기에 비해 아무것도 아닙니다.
bitselect 구현 :
inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}
inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
//Reinterpret floats. Would work because it's just a bit select, no matter the actual value
int& at = reinterpret_cast<int&>(truereturnvalue);
int& af = reinterpret_cast<int&>(falsereturnvalue);
int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
return reinterpret_cast<float&>(res);
}
루프는 다음과 같아야합니다.
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
bool isSmaeller = dist < pt.min_dist;
//use same value if not smaller
pt.min_index = bitselect(isSmaeller, i, pt.min_index);
pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
C ++는 고급 언어입니다. C ++ 소스 코드의 제어 흐름이 분기 명령으로 변환된다는 가정에는 결함이 있습니다. 귀하의 예제에서 일부 유형에 대한 정의가 없으므로 유사한 조건부 할당으로 간단한 테스트 프로그램을 만들었습니다.
int g(int, int);
int f(const int *arr)
{
int min = 10000, minIndex = -1;
for ( int i = 0; i < 1000; ++i )
{
if ( arr[i] < min )
{
min = arr[i];
minIndex = i;
}
}
return g(min, minIndex);
}
정의되지 않은 "g"를 사용하는 것은 단지 최적화 프로그램이 모든 것을 삭제하는 것을 방지하기위한 것입니다. 나는 이것을 -O3 및 -S와 함께 G ++ 4.9.2로 x86_64 어셈블리로 변환했으며 (-march의 기본값을 변경할 필요도 없음) 결과는 루프 본문 에 분기가 없다는 것입니다.
movl (%rdi,%rax,4), %ecx
movl %edx, %r8d
cmpl %edx, %ecx
cmovle %ecx, %r8d
cmovl %eax, %esi
addq $1, %rax
그 외에도, 분기가 반드시 더 빠르다는 가정은 새로운 거리가 이전 거리를 "이길"확률이 더 많은 요소를 볼수록 감소하기 때문에 결함이있을 수 있습니다. 동전 던지기가 아닙니다. "bitselect"트릭은 컴파일러가 오늘날보다 "as-if"어셈블리를 생성하는 데 훨씬 덜 공격적 일 때 발명되었습니다. 컴파일러 가 코드를 더 잘 최적화 할 수 있도록 코드를 재 작업하기 전에 컴파일러 가 실제로 생성 하는 어셈블리의 종류를 살펴 보거나 그 결과를 손으로 작성한 어셈블리의 기초로 삼는 것이 좋습니다. SIMD를 살펴보고 싶다면 데이터 종속성을 줄인 "최소한의"접근 방식을 시도해 볼 것을 제안합니다 (제 예에서는 "최소"에 대한 종속성).
첫째, 코드 변경을 시도하기 전에 최적화 된 빌드에서 디스 어셈블리를 살펴 보는 것이 좋습니다. 이상적으로는 어셈블리 수준에서 프로파일 러 데이터를보고자합니다. 예를 들어 다음과 같은 다양한 항목이 표시 될 수 있습니다.
- 컴파일러가 실제 분기 명령어를 생성하지 않았을 수 있습니다.
- 병목 현상이있는 코드 줄에는 생각보다 더 많은 명령이 연관되어있을 수 있습니다 (예 : dist 계산).
그 외에도 거리를 계산할 때 종종 제곱근이 필요하다는 표준 트릭이 있습니다. 프로세스가 끝날 때 최소 제곱 값에 대해 제곱근을 수행해야합니다.
SSE는 _mm_min_ps를 사용하여 분기없이 한 번에 4 개의 값을 처리 할 수 있습니다 . 정말 속도가 필요한 경우 SSE (또는 AVX) 내장 함수를 사용하고 싶습니다. 다음은 기본적인 예입니다.
float MinimumDistance(const float *values, int count)
{
__m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
int i=0;
for (; i < count - 3; i+=4)
{
__m128 distances = _mm_loadu_ps(&values[i]);
min = _mm_min_ps(min, distances);
}
// Combine the four separate minimums to a single value
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));
// Deal with the last 0-3 elements the slow way
float result = FLT_MAX;
if (count > 3) _mm_store_ss(&result, min);
for (; i < count; i++)
{
result = min(values[i], result);
}
return result;
}
최상의 SSE 성능을 위해 정렬 된 주소에서로드가 발생하는지 확인해야합니다. 필요한 경우 위 코드의 마지막 몇 개와 동일한 방식으로 처음 몇 개의 잘못 정렬 된 요소를 처리 할 수 있습니다.
주의해야 할 또 다른 사항은 메모리 대역폭입니다. 해당 루프 동안 사용하지 않는 ClusterCentroid 구조의 여러 멤버가있는 경우 메모리는 각각 64 바이트 인 캐시 라인 크기의 청크에서 읽으므로 실제로 필요한 것보다 훨씬 더 많은 데이터를 메모리에서 읽게됩니다.
이것은 양방향으로 진행될 수 있지만 다음 구조를 시도해 보겠습니다.
std::vector<float> centDists(num_centroids); //<-- one for each thread.
for (size_t p=0; p<num_points; ++p) {
Point& pt = points[p];
for (size_t c=0; c<num_centroids; ++c) {
const float dist = ...;
centDists[c]=dist;
}
pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();
}
분명히, 이제 메모리를 두 번 반복해야하므로 캐시 적중률이 손상 될 수 있지만 (하위 범위로 분할 할 수도 있음) 반면에 각 내부 루프는 쉽게 벡터화하고 풀 수 있어야합니다. 따라서 그만한 가치가 있는지 측정해야합니다.
그리고 당신이 당신의 버전을 고수하더라도 지역 변수 를 사용 하여 최소 인덱스와 거리를 추적하고 결과를 마지막 지점에 적용하려고합니다.
합리적인 이유는 각 읽기 또는 쓰기가 pt.min_dist
포인터를 통해 효과적으로 수행되며 컴파일러 최적화에 따라 성능이 저하되거나 저하되지 않을 수 있다는 것입니다.
벡터화에 중요한 또 다른 것은 Structs 배열 (이 경우 중심)을 배열 구조 (예 : 포인트의 각 좌표에 대해 하나 의 배열 )로 바꾸는 것입니다. 이렇게하면 추가 수집 명령이 필요하지 않습니다 . SIMD 명령어와 함께 사용할 데이터를로드합니다. 해당 주제에 대한 자세한 내용 은 Eric Brumer의 강연 을 참조하십시오.
편집 : 내 시스템에 대한 일부 숫자 (haswell, clang 3.5) :
벤치 마크로 짧은 테스트를 수행했으며 내 시스템에서 위의 코드로 인해 알고리즘이 약 10 % 느려졌습니다. 기본적으로 아무것도 벡터화 할 수 없습니다.
그러나 중심에 대해 AoS를 SoA 변환에 적용 할 때 거리 계산이 벡터화되어 AoS를 SoA 변환에 적용한 원래 구조에 비해 전체 런타임이 약 40 % 감소했습니다.
한 가지 가능한 마이크로 최적화 : min_dist 및 min_index를 지역 변수에 저장합니다. 컴파일러는 작성한 방식대로 메모리에 더 자주 써야 할 수 있습니다. 일부 아키텍처에서는 성능에 큰 영향을 미칠 수 있습니다. 다른 예는 여기 내 대답을 참조하십시오 .
한 번에 4 개의 비교를 수행하라는 Adams의 제안도 좋은 것입니다.
그러나 가장 빠른 속도 향상은 확인해야하는 중심의 수를 줄이는 것입니다. 이상적으로는 중심 주위에 kd- 트리 (또는 유사)를 만든 다음 쿼리하여 가장 가까운 지점을 찾습니다.
주변에 나무를 짓는 코드가 없다면, 제가 가장 좋아하는 "가난한 사람"의 가장 가까운 지점 검색은 다음과 같습니다.
Sort the points by one coordinate, e.g. cent.pos[0]
Pick a starting index for the query point (pt)
Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist
Repeat the previous step going the opposite direction.
검색을위한 추가 중지 조건은 상당한 양의 포인트를 건너 뛰어야 함을 의미합니다. 또한 이미 찾은 최고점보다 더 가까운 점을 건너 뛰지 않도록 보장합니다.
따라서 코드의 경우 다음과 같습니다.
// sort centroid by x coordinate.
min_index = -1;
min_dist = numeric_limits<float>::max();
// pick the start index. This works well if the points are evenly distributed.
float min_x = centroids[0].pos[0];
float max_x = centroids[num_centroids-1].pos[0];
float cur_x = pt.pos[0];
float t = (max_x - cur_x) / (max_x - min_x);
// TODO clamp t between 0 and 1
int start_index = int(t * float(num_centroids))
// Forward search
for (int i=start_index ; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
if (fabs(cent.pos[0] - pt.pos[0]) > min_i)
// Everything to the right of this must be further min_dist, so break.
// This is where the savings comes from!
break;
const float dist = ...;
if (dist < min_dist)
{
min_dist = dist;
min_index = i;
}
}
// Backwards search
for (int i=start_index ; i >= 0; --i)
{
// same as above
}
pt.min_dist = min_dist
pt.min_index = min_index
(이것은 점 사이의 거리를 계산하고 있다고 가정하지만 어셈블리는 거리의 제곱임을 나타냅니다. 그에 따라 중단 조건을 조정하십시오).
트리를 구축하거나 중심을 정렬하는 데 약간의 오버 헤드가 있지만 더 큰 루프에서 계산을 더 빠르게 수행하여 (포인트 수 이상)이를 상쇄해야합니다.
참고 URL : https://stackoverflow.com/questions/30023245/branchless-k-means-or-other-optimizations
'developer tip' 카테고리의 다른 글
순수 가상 함수에는 인라인 정의가 없을 수 있습니다. (0) | 2020.12.09 |
---|---|
int에서 해제 / nullable 변환이있는 심각한 버그, 십진수 변환 허용 (0) | 2020.12.09 |
React Router 인증 (0) | 2020.12.09 |
PDF 페이지가 컬러인지 흑백인지 어떻게 알 수 있습니까? (0) | 2020.12.09 |
.NET 4에는 기본 제공 JSON 직렬 변환기 / 역 직렬 변환기가 있습니까? (0) | 2020.12.09 |