�����JFIF��XX����������    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222�����"����4���������������������������� ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������,�PG"Z_�4�˷����kjز�Z�,F+��_z�,�© �����zh6�٨�ic�fu������������������������������������#ډb���_�N��?�����������wQ���5-�~�I���8���������������������������������TK<5o�Iv-������������������k�_U_������������������������������~b�M��d��������Ӝ�U�Hh��?]��E�w��Q���k�{��_}qFW7HTՑ��Y��F�����?_�'ϔ��_�Ջt������������������������=||I �����6�έ"�����D���/[�k�9����Y�8������ds|\���Ҿp6�Ҵ���]��.����6���z<�v��@]�i%������������������������$j��~����g��J>��no����pM[me�i$[�����������s�o�ᘨ�˸ nɜG-�ĨU�ycP���3.DB�li�;���������������������hj���x����7Z^�N�h��������N3u{�:j�����x�힞��#M��&��jL P@��_���� P�������������������&��o8��������9������@Sz���6�t7#O�ߋ �����s}Yf�T������lmr����Z)'N��k�۞p�����w\�T���������������ȯ?�8`���O��i{wﭹW�[�r�� ��Q4F�׊������3m&L�=��h3�������z~��#����\�l :�F,j@�� ʱ�wQT����8�"kJO����6�֚l������������������}����R�>ډK���]��y����&����p�}b������;N�1�m�r$����|��7�>e�@���B�TM*-i�H��g�D�)� E�m�|�ؘbҗ�a���Ҿ����������������t4�����o���G��*oCN�rP���Q��@z,|?W[0���������:�n,j���WiE��W������$~/�hp\��?��{(�0���+�Y8rΟ�+����>S-S���������������VN;���}�s?.����� w��9��˟<���Mq4�Wv'������{)0�1mB����V����W[��������8�/<� �%���wT^�5���b��)iM� p�g�N�&ݝ������������VO~��q���u���9��� ����!��J27�����$����O-���! �:���%H��� ـ�������y�ΠM=t{!S�� �oK8�������t<����è��������:a��������[������ա�H���~��w��Qz`�p����o�^ ������Q��n����� �,uu�C��$ ^���,�������8�#��:�6��e�|~�����������!�3��3.�\0�����q��o�4`.|� ����y�Q�`~;�d�ׯ,��O�Zw�������`73�v�܋�<�����Ȏ�� ـ4k��5�K�a�u�=9Yd��$>x�A�&�� j0� ���vF��� Y���|�y��� ~�6�@c��1vOp��������Ig�����4��l�OD�����L����� R���c���j�_�uX�6��3?nk��Wy�f;^*B� ��@���~a�`��Eu�������+�����6�L��.ü>��}y���}_�O�6�͐�:�Yr���G�X��kG������l^w����������~㒶sy���Iu�!���� W ��X��N�7BV��O��!X�2����wvG�R�f�T#�����t�/?���%8�^�W�aT����G�cL�M���I��(J����1~�8�?aT ���]����AS�E��(��*E}� 2������#I/�׍qz��^t�̔���������b�Yz4x����t�){ OH�����+(E��A&�N�������XT��o��"�XC����'���)}�J�z�p� ����~5�}�^����+�6����w��c��Q�|�Lp�d�H��}�(�.|����k��c4^�����"�����Z?ȕ ��a<�������L�!0�39C� �Eu�����C�F�Ew�ç ;�n?�*o���B�8�bʝ���'#Rqf����M}7����]�������s2tcS{�\icTx;�\��7K���P������ʇ Z O-��~�������c>"��?��������P�����E��O�8��@�8��G��Q�g�a�Վ���󁶠��䧘��_%#r�>�����1�z�a���eb��qcP��ѵ��n���#L��� =��׀t� L�7�`�����V����A{�C:�g���e@�����w1 Xp�3�c3�ġ�������p��M"'-�@n4���fG���B3�DJ�8[Jo�ߐ���gK)ƛ��$���� �������8�3�����+���� �����6�ʻ���� ���S�kI�*KZlT _`�������?��K�����QK�d���������B`�s}�>���`������*�>��,*@J�d�oF*�����弝��O}�k��s��]��y�ߘ�������c1G�V���<=�7��7����6��q�PT��tXԀ�!9*4�4Tހ���3XΛex�46�������Y��D ����� ����BdemDa����\�_l,����G�/���֌7���Y�](�xTt^%�GE�����4�}bT����ڹ�����;��Y)���B�Q��u��>J/J ���⮶.�XԄ��j�ݳ������+E��d ���r�5�_D�����1 ���o�� �B�x�΢�#����<��W�����8���R6�@���g�M�.��� dr�D��>(otU��@�x=��~v���2� ӣ�d�oBd�����3�eO�6�㣷����������ݜ�6��6Y��Qz`����S��{���\P��~z m5{J/L��1������<�e�ͅPu���b�]�ϔ��������'�������f�b� Zpw��c`"��i���BD@:)ִ�:�]��h���v�E��w���T�l�������P����"Ju�}��وV ��J��G6��. J/�Qgl߭�e�����@�z�Zev2u����)]կ���������7x�������s�M�-<ɯ�c��r��v�����@��$�ޮ}lk���a����'����>x��O\�Z������Fu>������ck#��&:��`�$��ai�>2Δ����l���oF[h�������lE�ܺ�Π���k:)���`������� $[6�����9�����kOw�\|�����8}������ބ:��񶐕��������I�A1/���=�2[�,�!��.}gN#�u����b���� ~���������݊��}34q�����d�E��L��������c��$���"�[q�U�硬g^��%B ��z���r�p�������J�ru%v\h�����1Y�ne`������ǥ:g����pQM~�^��Xi� ��`S�:V2������9.�P���V������?B�k�� ��������AEvw%�_�9C�Q����wKekP�ؠ�\������;Io d�{ ߞo�c1eP�����\� `����E=���@K<�Y��������eڼ�J����w����{av�F�'�M�@��������������/J��+9p����|]���������Iw &`���8���&�M�hg���[�{�������Xj���%��Ӓ�������������������$��(�����ʹN�������<>�I���RY�����K2�NPlL�ɀ�)��&e��������B+ь����(������������������� � �JTx����_?EZ� }@���� 6�U���뙢ط�z��dWI��n` D����噥�[��uV��"�G&�����Ú����2�g�}&m���������������������?ċ���"����Om#�������������������������� ��{���������������������ON��"S�X���Ne��ysQ���@�������������Fn��Vg�����dX�~nj����������������������]J�<�K]:����FW���b�������62����������=��5f����JKw����bf�X������������������������55��~J �%^�������:�-�QIE��P��v�nZum� z � ~ə ���� ���ة����;�f��\v�������g�8�1��f2�������������������������4;�V���ǔ�)�������������������9���1\������������������������������c��v�/'Ƞ�w������������������$�4�R-��t����������������������������������� e�6�/�ġ �̕Ecy�J���u�B���<�W�ַ~�w[B1L۲�-JS΂�{���΃�������������������������������������������A��20�c#���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������@���� 0!1@AP"#2Q`$3V�%45a6�FRUq����� ������^7ׅ,$n��������+��F�`��2X'��0vM��p�L=�������5��8������u�p~���.�`r�����\����O��,ư�0oS ��_�M�����l���4�kv\JSd���x���SW�<��Ae�IX����������$I���w�:S���y���›R��9�Q[���,�5�;�@]�%���u�@ *ro�lbI �� ��+���%m:�͇ZV�����u�̉����θau<�fc�.����{�4Ա� �Q����*�Sm��8\ujqs]{kN���)qO�y�_*dJ�b�7���yQqI&9�ԌK!�M}�R�;�������S�T���1���i[U�ɵz�]��U)V�S6���3$K{��ߊ<�(� E]Զ[ǼENg�����'�\?#)Dkf��J���o��v���'�%ƞ�&K�u��!��b�35LX�Ϸ��63$K�a�;�9>,R��W��3�3� d�JeTYE.Mϧ��-�o�j3+y��y^�c�������VO�9NV\nd�1 ��!͕_)a�v;����թ�M�lWR1��)El��P;��yوÏ�u 3�k�5Pr6<�⒲l�!˞*��u־�n�!�l:����UNW ��%��Chx8vL'��X�@��*��)���̮��ˍ��� ����D-M�+J�U�kvK����+�x8��cY������?�Ԡ��~3mo��|�u@[XeY�C�\Kp�x8�oC�C�&����N�~3-H���� ��MX�s�u<`���~"WL��$8ξ��3���a�)|:@�m�\���^�`�@ҷ)�5p+��6���p�%i)P M���ngc�����#0Aruz���RL+xSS?���ʮ}()#�t��mˇ!��0}}y����<�e� �-ή�Ԩ��X������ MF���ԙ~l L.3���}�V뽺�v������멬��Nl�)�2����^�Iq��a��M��qG��T�����c3#������3U�Ǎ���}��לS�|qa��ڃ�+���-��2�f����/��bz��ڐ�� �ݼ[2�ç����k�X�2�* �Z�d���J�G����M*9W���s{��w���T��x��y,�in�O�v��]���n����P�$��JB@=4�OTI�n��e�22a\����q�d���%�$��(���:���: /*�K[PR�fr\nڙdN���F�n�$�4��[�� U�zƶ����� �mʋ���,�ao�u 3�z� �x��Kn����\[��VFmbE;�_U��&V�Gg�]L�۪&#n%�$ɯ��dG���D�TI=�%+AB�Ru#��b4�1�»x�cs�YzڙJG��f��Il���d�eF'T� iA��T���uC�$����Y��H?����[!G`}���ͪ� �纤Hv\������j�Ex�K���!���OiƸ�Yj�+u-<���'q����uN�*�r\��+�]���<�wOZ.fp�ێ��,-*)V?j-kÊ#�`�r��dV����(�ݽBk�����G�ƛk�QmUڗe��Z���f}|����8�8��a���i��3'J�����~G_�^���d�8w������ R�`(�~�.��u���l�s+g�bv���W���lGc}��u���afE~1�Ue������Z�0�8�=e�� f@/�jqEKQQ�J���oN��J���W5~M>$6�Lt�;$ʳ{���^��6�{����v6���ķܰg�V�cnn �~z�x�«�,2�u�?cE+Ș�H؎�%�Za�)���X>uW�Tz�Nyo����s���FQƤ��$��*�&�LLXL)�1�" L��eO��ɟ�9=���:t��Z���c��Ž���Y?�ӭV�wv�~,Y��r�ۗ�|�y��GaF�����C�����.�+� ���v1���fήJ�����]�S��T��B��n5sW}y�$��~z�'�c ��8 ��� ,! �p��VN�S��N�N�q��y8z˱�A��4��*��'������2n<�s���^ǧ˭P�Jޮɏ�U�G�L�J�*#��<�V��t7�8����TĜ>��i}K%,���)[��z�21z ?�N�i�n1?T�I�R#��m-�����������������1����lA�`��fT5+��ܐ�c�q՝��ʐ��,���3�f2U�եmab��#ŠdQ�y>\��)�SLY����w#��.���ʑ�f��� ,"+�w�~�N�'�c�O�3F�������N<���)j��&��,-� �љ���֊�_�zS���TǦ����w�>��?�������n��U仆�V���e�����0���$�C�d���rP �m�׈e�Xm�Vu� �L��.�bֹ��� �[Դaզ���*��\y�8�Է:�Ez\�0�Kq�C b��̘��cө���Q��=0Y��s�N��S.����3.���O�o:���#���v7�[#߫ ��5�܎�L���Er4���9n��COWlG�^��0k�%<���ZB���aB_���������'=��{i�v�l�$�uC���mƎҝ{�c㱼�y]���W�i ��ߧc��m�H� m�"�"�����;Y�ߝ�Z�Ǔ�����:S#��|}�y�,/k�Ld� TA�(�AI$+I3��;Y*���Z��}|��ӧO��d�v��..#:n��f>�>���ȶI�TX��� 8��y����"d�R�|�)0���=���n4��6ⲑ�+��r<�O�܂~zh�z����7ܓ�HH�Ga롏���nCo�>������a ���~]���R���̲c?�6(�q�;5%� |�uj�~z8R�=X��I�V=�|{v�Gj\gc��q����z�؋%M�ߍ����1y��#��@f^���^�>N������#x#۹��6�Y~�?�dfPO��{��P�4��V��u1E1J �*|���%����JN��`eWu�zk M6���q t[�� ��g�G���v��WIG��u_ft����5�j�"�Y�:T��ɐ���*�;� e5���4����q$C��2d�}���� _S�L#m�Yp��O�.�C�;��c����Hi#֩%+) �Ӎ��ƲV���SYź��g |���tj��3�8���r|���V��1#;.SQ�A[���S������#���`n�+���$��$�I �P\[�@�s��(�ED�z���P��])8�G#��0B��[ى��X�II�q<��9�~[Z멜�Z�⊔IWU&A>�P~�#��dp<�?����7���c��'~���5 ��+$���lx@�M�dm��n<=e�dyX��?{�|Aef ,|n3�<~z�ƃ�uۧ�����P��Y,�ӥQ�*g�#먙R�\���;T��i,��[9Qi歉����c>]9�� ��"�c��P�� �Md?٥��If�ت�u��k��/����F��9�c*9��Ǎ:�ØF���z�n*�@|I�ށ9����N3{'��[�'ͬ�Ҳ4��#}��!�V� Fu��,�,mTIk���v C�7v���B�6k�T9��1�*l� '~��ƞF��lU��'�M ����][ΩũJ_�{�i�I�n��$����L�� j��O�dx�����kza۪��#�E��Cl����x˘�o�����V���ɞ�ljr��)�/,�߬h�L��#��^��L�ф�,íMƁe�̩�NB�L�����iL����q�}��(��q��6IçJ$�W�E$��:������=#����(�K�B����zђ <��K(�N�۫K�w��^O{!����)��H���>x�������lx�?>Պ�+�>�W���,Ly!_�D���Ō�l���Q�!�[ �S����J��1��Ɛ�Y}��b,+�Lo�x�ɓ)����=�y�oh�@�꥟/��I��ѭ=��P�y9��� �ۍYӘ�e+�p�Jnϱ?V\SO%�(�t� ���=?MR�[Ș�����d�/ ��n�l��B�7j� ��!�;ӥ�/�[-���A�>��dN�sLj ��,ɪv��=1c�.SQ�O3�U���ƀ�ܽ�E����������̻��9G�ϷD�7(�}��Ävӌ\��y�_0[w ���<΍>����a_��[0+�L��F.�޺��f�>oN�T����q;���y\��bՃ��y�jH�<|q-eɏ�_?_9+P���Hp$�����[ux�K w�Mw��N�ی'$Y2�=��q���KB��P��~�������Yul:�[<����F1�2�O���5=d����]Y�sw:���Ϯ���E��j,_Q��X��z`H1,#II ��d�wr��P˂@�ZJV����y$�\y�{}��^~���[:N����ߌ�U�������O��d�����ؾe��${p>G��3c���Ė�lʌ�� ת��[��`ϱ�-W����dg�I��ig2��� ��}s ��ؤ(%#sS@���~���3�X�nRG�~\jc3�v��ӍL��M[JB�T��s3}��j�Nʖ��W����;7���ç?=X�F=-�=����q�ߚ���#���='�c��7���ڑW�I(O+=:uxq�������������e2�zi+�kuG�R��������0�&e�n���iT^J����~\jy���p'dtG��s����O��3����9* �b#Ɋ�� p������[Bws�T�>d4�ۧs���nv�n���U���_�~,�v����ƜJ1��s�� �QIz���)�(lv8M���U=�;����56��G���s#�K���MP�=��LvyGd��}�VwWBF�'�à �?MH�U�g2�� ����!�p�7Q��j��ڴ����=��j�u��� Jn�A s���uM������e��Ɔ�Ҕ�!)�'��8Ϣ�ٔ���ޝ(��Vp���צ֖d=�IC�J�Ǡ{q������kԭ�߸���i��@K����u�|�p=..�*+����x�����z[Aqġ#s2a�Ɗ���RR�)*HRsi�~�a &f��M��P����-K�L@��Z��Xy�'x�{}��Zm+���:�)�) IJ�-i�u���� ���ܒH��'��L(7�y�GӜq���� j��� 6ߌg1�g�o���,kر���tY�?W,���p���e���f�OQS��!K�۟cҒA�|ս�j�>��=⬒��˧L[�� �߿2JaB~R��u�:��Q�] �0H~���]�7��Ƽ�I���(�}��cq '�ήET���q�?f�ab���ӥvr� �)o��-Q��_'����ᴎo��K������;��V���o��%���~OK ����*��b�f:���-ťIR��`B�5!RB@���ï�� �u �̯e\�_U�_������� g�ES��3��������QT��a�����x����U<~�c?�*�#]�MW,[8O�a�x��]�1bC|踤�P��lw5V%�)�{t�<��d��5���0i�XSU��m:��Z�┵�i�"��1�^B�-��P�hJ��&)O��*�D��c�W��vM��)����}���P��ܗ-q����\mmζZ-l@�}��a��E�6��F�@��&Sg@���ݚ�M����� ȹ 4����#p�\H����dYDo�H���"��\��..R�B�H�z_�/5˘����6��KhJR��P�mƶi�m���3��,#c�co��q�a)*P�t����R�m�k�7x�D�E�\Y�閣_X�<���~�)���c[[�BP����6�Yq���S��0����%_����;��Àv�~�| VS؇ ��'O0��F0��\���U�-�d@�����7�SJ*z��3n��y��P����O����������m�~�P�3|Y��ʉr#�C�<�G~�.,! ���bqx���h~0=��!ǫ�jy����l��O,�[B��~��|9��ٱ����Xly�#�i�B��g%�S��������tˋ���e���ې��\[d�t)��.+u�|1 ������#�~Oj����hS�%��i.�~X���I�H�m��0n���c�1uE�q��cF�RF�o���7� �O�ꮧ� ���ۛ{��ʛi5�rw?׌#Qn�TW��~?y$��m\�\o����%W� ?=>S�N@�� �Ʈ���R����N�)�r"C�:��:����� �����#��qb��Y�. �6[��2K����2u�Ǧ�HYR��Q�MV��� �G�$��Q+.>�����nNH��q�^��� ����q��mM��V��D�+�-�#*�U�̒ ���p욳��u:�������IB���m����PV@O���r[b= �� ��1U�E��_Nm�yKbN�O���U�}�the�`�|6֮P>�\2�P�V���I�D�i�P�O;�9�r�mAHG�W�S]��J*�_�G��+kP�2����Ka�Z���H�'K�x�W�MZ%�O�YD�Rc+o��?�q��Ghm��d�S�oh�\�D�|:W������UA�Qc yT�q��������~^�H��/��#p�CZ���T�I�1�ӏT����4��"�ČZ�����}��`w�#�*,ʹ�� ��0�i��課�Om�*�da��^gJ݅{���l�e9uF#T�ֲ��̲�ٞC"�q���ߍ ոޑ�o#�XZTp����@ o�8��(jd��xw�]�,f���`~��|,s��^����f�1���t��|��m�򸄭/ctr��5s��7�9Q�4�H1꠲BB@�l9@���C�����+�wp�xu�£Yc�9��?`@#�o�mH�s2��)�=��2�.�l����jg�9$�Y�S�%*L������R�Y������7Z���,*=�䷘$�������arm�o�ϰ���UW.|�r�uf����IGw�t����Zwo��~5 ��YյhO+=8fF�)�W�7�L9lM�̘·Y���֘YLf�큹�pRF���99.A �"wz��=E\Z���'a� 2��Ǚ�#;�'}�G���*��l��^"q��+2FQ� hj��kŦ��${���ޮ-�T�٭cf�|�3#~�RJ����t��$b�(R��(����r���dx� >U b�&9,>���%E\� Ά�e�$��'�q't��*�א���ެ�b��-|d���SB�O�O��$�R+�H�)�܎�K��1m`;�J�2�Y~9��O�g8=vqD`K[�F)k�[���1m޼c��n���]s�k�z$@��)!I �x՝"v��9=�ZA=`Ɠi �:�E��)`�7��vI��}d�YI�_ �o�:ob���o ���3Q��&D&�2=�� �Ά��;>�h����y.*ⅥS������Ӭ�+q&����j|UƧ�����}���J0��WW< ۋS�)jQR�j���Ư��rN)�Gű�4Ѷ(�S)Ǣ�8��i��W52���No˓� ۍ%�5brOn�L�;�n��\G����=�^U�dI���8$�&���h��'���+�(������cȁ߫k�l��S^���cƗjԌE�ꭔ��gF���Ȓ��@���}O���*;e�v�WV���YJ\�]X'5��ղ�k�F��b 6R�o՜m��i N�i�����>J����?��lPm�U��}>_Z&�KK��q�r��I�D�Չ~�q�3fL�:S�e>���E���-G���{L�6p�e,8��������QI��h��a�Xa��U�A'���ʂ���s�+טIjP�-��y�8ۈZ?J$��W�P� ��R�s�]��|�l(�ԓ��sƊi��o(��S0���Y� 8�T97.�����WiL��c�~�dxc�E|�2!�X�K�Ƙਫ਼�$((�6�~|d9u+�qd�^3�89��Y�6L�.I�����?���iI�q���9�)O/뚅����O���X��X�V��ZF[�یgQ�L��K1���RҖr@v�#��X�l��F���Нy�S�8�7�kF!A��sM���^rkp�jP�DyS$N���q���nxҍ!U�f�!eh�i�2�m����`�Y�I�9r�6� �TF���C}/�y�^���Η���5d�'��9A-��J��>{�_l+�`��A���[�'��յ�ϛ#w:݅�%��X�}�&�PSt�Q�"�-��\縵�/����$Ɨh�Xb�*�y��BS����;W�ջ_mc�����vt?2}1�;qS�d�d~u:2k5�2�R�~�z+|HE!)�Ǟl��7`��0�<�,�2*���Hl-��x�^����'_TV�gZA�'j� ^�2Ϊ��N7t�����?w�� �x1��f��Iz�C-Ȗ��K�^q�;���-W�DvT�7��8�Z�������� hK�(P:��Q- �8�n�Z���܃e貾�<�1�YT<�,�����"�6{�/ �?�͟��|1�:�#g��W�>$����d��J��d�B���=��jf[��%rE^��il:��B���x���Sּ�1հ��,�=��*�7 fcG��#q� �eh?��2�7�����,�!7x��6�n�LC�4x��},Geǝ�tC.��vS �F�43��zz\��;QYC,6����~;RYS/6���|2���5���v��T��i����������mlv��������&� �nRh^ejR�LG�f���? �ۉҬܦƩ��|��Ȱ����>3����!v��i�ʯ�>�v��オ�X3e���_1z�Kȗ\<������!�8���V��]��?b�k41�Re��T�q��mz��TiOʦ�Z��Xq���L������q"+���2ۨ��8}�&N7XU7Ap�d�X��~�׿��&4e�o�F��� �H�����O���č�c�� 懴�6���͉��+)��v;j��ݷ�� �UV�� i��� j���Y9GdÒJ1��詞�����V?h��l�����l�cGs�ځ�������y�Ac������\V3�? �� ܙg�>qH�S,�E�W�[�㺨�uch�⍸�O�}���a��>�q�6�n6�����N6�q��������N� ���! 1AQaq�0@����"2BRb�#Pr���3C`��Scst���$4D���%Td���� ?�����N����a��3��m���C���w��������xA�m�q�m����m������$����4n淿t'��C"w��zU=D�\R+w�p+Y�T�&�պ@��ƃ��3ޯ?�Aﶂ��aŘ���@-�����Q�=���9D��ռ�ѻ@��M�V��P��܅�G5�f�Y<�u=,EC)�<�Fy'�"�&�չ�X~f��l�KԆV��?�� �W�N����=(� �;���{�r����ٌ�Y���h{�١������jW����P���Tc�����X�K�r��}���w�R��%��?���E��m�� �Y�q|����\lEE4����r���}�lsI�Y������f�$�=�d�yO����p�����yBj8jU�o�/�S��?�U��*������ˍ�0�������u�q�m [�?f����a�� )Q�>����6#������� ?����0UQ����,IX���(6ڵ[�DI�MNލ�c&���υ�j\��X�R|,4��� j������T�hA�e��^���d���b<����n�� �즇�=!���3�^�`j�h�ȓr��jẕ�c�,ٞX����-����a�ﶔ���#�$��]w�O��Ӫ�1y%��L�Y<�wg#�ǝ�̗`�x�xa�t�w��»1���o7o5��>�m뭛C���Uƃߜ}�C���y1Xνm�F8�jI���]����H���ۺиE@I�i;r�8ӭ�����V�F�Շ| ��&?�3|x�B�MuS�Ge�=Ӕ�#BE5G������Y!z��_e��q�р/W>|-�Ci߇�t�1ޯќd�R3�u��g�=0 5��[?�#͏��q�cf���H��{ ?u�=?�?ǯ���}Z��z���hmΔ�BFTW�����<�q��(v� ��!��z���iW]*�J�V�z��gX֧A�q�&��/w���u�gYӘa���; �i=����g:��?2�dž6�ى�k�4�>�Pxs����}������G�9���3 ���)gG�R<>r h�$��'nc�h�P��Bj��J�ҧH� -��N1���N��?��~��}-q!=��_2hc�M��l�vY%UE�@|�v����M2�.Y[|y�"Eï��K�ZF,�ɯ?,q�?v�M 80jx�"�;�9vk�����+ ֧�� �ȺU��?�%�vcV��mA�6��Qg^M�����A}�3�nl� QRN�l8�kkn�'�����(��M�7m9و�q���%ޟ���*h$Zk"��$�9��: �?U8�Sl��,,|ɒ��xH(ѷ����Gn�/Q�4�P��G�%��Ա8�N��!� �&�7�;���eKM7�4��9R/%����l�c>�x;������>��C�:�����t��h?aKX�bhe�ᜋ^�$�Iհ �hr7%F$�E��Fd���t��5���+�(M6�t����Ü�UU|zW�=a�Ts�Tg������dqP�Q����b'�m���1{|Y����X�N��b �P~��F^F:����k6�"�j!�� �I�r�`��1&�-$�Bevk:y���#y�w��I0��x��=D�4��tU���P�ZH��ڠ底taP��6����b>�xa�����Q�#� WeF��ŮNj�p�J* mQ�N�����*I�-*�ȩ�F�g�3 �5��V�ʊ�ɮ�a��5F���O@{���NX��?����H�]3��1�Ri_u��������ѕ�� ����0��� F��~��:60�p�͈�S��qX#a�5>���`�o&+�<2�D����: �������ڝ�$�nP���*)�N�|y�Ej�F�5ټ�e���ihy�Z �>���k�bH�a�v��h�-#���!�Po=@k̆IEN��@��}Ll?j�O������߭�ʞ���Q|A07x���wt!xf���I2?Z��<ץ�T���cU�j��]���陎Ltl �}5�ϓ��$�,��O�mˊ�;�@O��jE��j(�ا,��LX���LO���Ц�90�O �.����a��nA���7������j4 ��W��_ٓ���zW�jcB������y՗+EM�)d���N�g6�y1_x��p�$Lv�:��9�"z��p���ʙ$��^��JԼ*�ϭ����o���=x�Lj�6�J��u82�A�H�3$�ٕ@�=Vv�]�'�qEz�;I˼��)��=��ɯ���x �/�W(V���p�����$ �m�������u�����񶤑Oqˎ�T����r��㠚x�sr�GC��byp�G��1ߠ�w e�8�$⿄����/�M{*}��W�]˷.�CK\�ުx���/$�WP�w���r� |i���&�}�{�X� �>��$-��l���?-z���g����lΆ���(F���h�vS*���b���߲ڡn,|)mrH[���a�3�ר�[1��3o_�U�3�TC�$��(�=�)0�kgP���� ��u�^=��4 �WYCҸ:��vQ�ר�X�à��tk�m,�t*��^�,�}D*�� �"(�I��9R����>`�`��[~Q]�#af��i6l��8���6�:,s�s�N6�j"�A4���IuQ��6E,�GnH��zS�HO�uk�5$�I�4��ؤ�Q9�@��C����wp��BGv[]�u�Ov����0I4���\��y�����Q�Ѹ��~>Z��8�T��a��q�ޣ;z��a���/��S��I:�ܫ_�|������>=Z����8:�S��U�I�J��"IY���8%b8���H��:�QO�6�;7�I�S��J��ҌAά3��>c���E+&jf$eC+�z�;��V����� �r���ʺ������my�e���aQ�f&��6�ND���.:��NT�vm�<- u���ǝ\MvZY�N�NT��-A�>jr!S��n�O 1�3�Ns�%�3D@���`������ܟ 1�^c<���� �a�ɽ�̲�Xë#�w�|y�cW�=�9I*H8�p�^(4���՗�k��arOcW�tO�\�ƍR��8����'�K���I�Q�����?5�>[�}��yU�ײ -h��=��% q�ThG�2�)���"ו3]�!kB��*p�FDl�A���,�eEi�H�f�Ps�����5�H:�Փ~�H�0Dت�D�I����h�F3�������c��2���E��9�H��5�zԑ�ʚ�i�X�=:m�xg�hd(�v����׊�9iS��O��d@0ڽ���:�p�5�h-��t�&���X�q�ӕ,��ie�|���7A�2���O%P��E��htj��Y1��w�Ѓ!����  ���� ࢽ��My�7�\�a�@�ţ�J ��4�Ȼ�F�@o�̒?4�wx��)��]�P��~�����u�����5�����7X ��9��^ܩ�U;Iꭆ 5 �������eK2�7(�{|��Y׎ �V��\"���Z�1� Z�����}��(�Ǝ"�1S���_�vE30>���p;� ΝD��%x�W�?W?v����o�^V�i�d��r[��/&>�~`�9Wh��y�;���R���� ;;ɮT��?����r$�g1�K����A��C��c��K��l:�'��3 c�ﳯ*"t8�~l��)���m��+U,z��`(��>yJ�?����h>��]��v��ЍG*�{`��;y]��I�T� ;c��NU�fo¾h���/$���|NS���1�S�"�H��V���T���4��uhǜ�]�v;���5�͠x��'C\�SBpl���h}�N����� A�Bx���%��ޭ�l��/����T��w�ʽ]D�=����K���ž�r㻠l4�S�O?=�k �M:� ��c�C�a�#ha���)�ѐxc�s���gP�iG���{+���x���Q���I= �� z��ԫ+ �8"�k�ñ�j=|����c ��y��CF��/���*9ж�h{ �?4�o� ��k�m�Q�N�x��;�Y��4膚�a�w?�6�>�e]�����Q�r�:����g�,i"�����ԩA��*M�<�G��b�if��l^M��5�� �Ҩ�{����6J��ZJ�����P�*�����Y���ݛu�_4�9�I8�7���������,^ToR���m4�H��?�N�S�ѕw��/S��甍�@�9H�S�T��t�ƻ���ʒU��*{Xs�@����f������֒Li�K{H�w^���������Ϥm�tq���s� ���ք��f:��o~s��g�r��ט� �S�ѱC�e]�x���a��) ���(b-$(�j>�7q�B?ӕ�F��hV25r[7 Y� }L�R��}����*sg+��x�r�2�U=�*'WS��ZDW]�WǞ�<��叓���{�$�9Ou4��y�90-�1�'*D`�c�^o?(�9��u���ݐ��'PI&� f�Jݮ�������:wS����jfP1F:X �H�9dԯ����˝[�_54 �}*;@�ܨ�� ð�yn�T���?�ןd�#���4rG�ͨ��H�1�|-#���Mr�S3��G�3�����)�.᧏3v�z֑��r����$G"�`j �1t��x0<Ɔ�Wh6�y�6��,œ�Ga��gA����y��b��)���h�D��ß�_�m��ü �gG;��e�v��ݝ�nQ� ��C����-�*��o���y�a��M��I�>�<���]obD��"�:���G�A��-\%LT�8���c�)��+y76���o�Q�#*{�(F�⽕�y����=���rW�\p���۩�c���A���^e6��K������ʐ�cVf5$�'->���ՉN"���F�"�UQ@�f��Gb~��#�&�M=��8�ט�JNu9��D��[̤�s�o�~������� G��9T�tW^g5y$b��Y'��س�Ǵ�=��U-2 #�MC�t(�i� �lj�@Q 5�̣i�*�O����s�x�K�f��}\��M{E�V�{�υ��Ƈ�����);�H����I��fe�Lȣr�2��>��W��I�Ȃ6������i��k�� �5�YOxȺ����>��Y�f5'��|��H+��98pj�n�.O�y�������jY��~��i�w'������l�;�s�2��Y��:'lg�ꥴ)o#'Sa�a�K��Z� �m��}�`169�n���"���x��I ��*+� }F<��cГ���F�P�������ֹ*�PqX�x۩��,� ��N�� �4<-����%����:��7����W���u�`����� $�?�I��&����o��o��`v�>��P��"��l���4��5'�Z�gE���8���?��[�X�7(��.Q�-��*���ތL@̲����v��.5���[��=�t\+�CNܛ��,g�SQnH����}*F�G16���&:�t��4ُ"A��̣��$�b �|����#rs��a�����T�� ]�<�j��B�S�('$�ɻ� �wP;�/�n��?�ݜ��x�F��yUn�~mL*-�������Xf�wd^�a�}��f�,=t�׵i�.2/wpN�Ep8�OР���•��R�FJ� 55TZ��T �ɭ�<��]��/�0�r�@�f��V��V����Nz�G��^���7hZi����k��3�,kN�e|�vg�1{9]_i��X5y7� 8e]�U����'�-2,���e"����]ot�I��Y_��n�(JҼ��1�O ]bXc���Nu�No��pS���Q_���_�?i�~�x h5d'�(qw52] ��'ޤ�q��o1�R!���`ywy�A4u���h<קy���\[~�4�\ X�Wt/� 6�����n�F�a8��f���z �3$�t(���q��q�x��^�XWeN'p<-v�!�{�(>ӽDP7��ո0�y)�e$ٕv�Ih'Q�EA�m*�H��RI��=:��� ���4牢) �%_iN�ݧ�l]� �Nt���G��H�L��� ɱ�g<���1V�,�J~�ٹ�"K��Q�� 9�HS�9�?@��k����r�;we݁�]I�!{ �@�G�[�"��`���J:�n]�{�cA�E����V��ʆ���#��U9�6����j�#Y�m\��q�e4h�B�7��C�������d<�?J����1g:ٳ���=Y���D�p�ц� ׈ǔ��1�]26؜oS�'��9�V�FVu�P�h�9�xc�oq�X��p�o�5��Ա5$�9W�V(�[Ak�aY錎qf;�'�[�|���b�6�Ck��)��#a#a˙��8���=äh�4��2��C��4tm^ �n'c����]GQ$[Wҿ��i���vN�{Fu ��1�gx��1┷���N�m��{j-,��x�� Ūm�ЧS�[�s���Gna���䑴�� x�p 8<������97�Q���ϴ�v�aϚG��Rt�Һ׈�f^\r��WH�JU�7Z���y)�vg=����n��4�_)y��D'y�6�]�c�5̪��\� �PF�k����&�c;��cq�$~T�7j ���nç]�<�g ":�to�t}�159�<�/�8������m�b�K#g'I'.W������6��I/��>v��\�MN��g���m�A�yQL�4u�Lj�j9��#44�t��l^�}L����n��R��!��t��±]��r��h6ٍ>�yҏ�N��fU�� ���� Fm@�8}�/u��jb9������he:A�y�ծw��GpΧh�5����l}�3p468��)U��d��c����;Us/�֔�YX�1�O2��uq�s��`hwg�r~�{ R��mhN��؎*q 42�*th��>�#���E����#��Hv�O����q�}������6�e��\�,Wk�#���X��b>��p}�դ��3���T5��†��6��[��@��P�y*n��|'f�֧>�lư΂�̺����SU�'*�q�p�_S�����M�� '��c�6������m�� ySʨ;M��r���Ƌ�m�Kxo,���Gm�P��A�G�:��i��w�9�}M(�^�V��$ǒ�ѽ�9���|���� �a����J�SQ�a���r�B;����}���ٻ֢�2�%U���c�#�g���N�a�ݕ�'�v�[�OY'��3L�3�;,p�]@�S��{ls��X�'���c�jw��k'a�.��}�}&�� �dP�*�bK=ɍ!����;3n�gΊU�ߴmt�'*{,=SzfD� A��ko~�G�aoq�_mi}#�m�������P�Xhύ�����mxǍ�΂���巿zf��Q���c���|kc�����?���W��Y�$���_Lv����l߶��c���`?����l�j�ݲˏ!V��6����U�Ђ(A���4y)H���p�Z_�x��>���e���R��$�/�`^'3qˏ�-&Q�=?��CFVR �D�fV�9��{�8g�������n�h�(P"��6�[�D���< E�����~0<@�`�G�6����Hг�cc�� �c�K.5��D��d�B���`?�XQ��2��ٿyqo&+�1^� DW�0�ꊩ���G�#��Q�nL3��c���������/��x ��1�1�[y�x�პCW��C�c�UĨ80�m�e�4.{�m��u���I=��f�����0QRls9���f���������9���~f�����Ǩ��a�"@�8���ȁ�Q����#c�ic������G��$���G���r/$W�(��W���V�"��m�7�[m�A�m����bo��D� j����۳� l���^�k�h׽����� ��#� iXn�v��eT�k�a�^Y�4�BN���ĕ���0������� !01@Q"2AaPq3BR�������?�����@4�Q�����T3,���㺠�W�[=JK�Ϟ���2�r^7��vc�:�9 �E�ߴ�w�S#d���Ix��u��:��Hp��9E!�� V 2;73|F��9Y���*ʬ�F��D����u&���y؟��^EA��A��(ɩ���^��GV:ݜDy�`��Jr29ܾ�㝉��[���E;Fzx��YG��U�e�Y�C���� ����v-tx����I�sם�Ę�q��Eb�+P\ :>�i�C'�;�����k|z�رn�y]�#ǿb��Q��������w�����(�r|ӹs��[�D��2v-%��@;�8<a���[\o[ϧw��I!��*0�krs)�[�J9^��ʜ��p1)� "��/_>��o��<1����A�E�y^�C��`�x1'ܣn�p��s`l���fQ��):�l����b>�Me�jH^?�kl3(�z:���1ŠK&?Q�~�{�ٺ�h�y���/�[��V�|6��}�KbX����mn[-��7�5q�94�������dm���c^���h� X��5��<�eޘ>G���-�}�دB�ޟ� ��|�rt�M��V+�]�c?�-#ڛ��^ǂ}���Lkr���O��u�>�-D�ry� D?:ޞ�U��ǜ�7�V��?瓮�"�#���r��չģVR;�n���/_� ؉v�ݶe5d�b9��/O��009�G���5n�W����JpA�*�r9�>�1��.[t���s�F���nQ� V 77R�]�ɫ8����_0<՜�IF�u(v��4��F�k�3��E)��N:��yڮe��P�`�1}�$WS��J�SQ�N�j��ٺ��޵�#l���ј(�5=��5�lǏmoW�v-�1����v,W�mn��߀$x�<����v�j(����c]��@#��1������Ǔ���o'��u+����;G�#�޸��v-lη��/(`i⣍Pm^����ԯ̾9Z��F��������n��1��� ��]�[��)�'�������:�֪�W��FC����� �B9،!?���]��V��A�Վ�M��b�w��G F>_DȬ0¤�#�QR�[V��kz���m�w�"��9ZG�7'[��=�Q����j8R?�zf�\a�=��O�U����*oB�A�|G���2�54 �p��.w7� �� ���&������ξxGHp� B%��$g�����t�Џ򤵍z���HN�u�Я�-�'4��0���;_���3������� !01"@AQa2Pq#3BR�������?����ʩca��en��^��8���<�u#��m*08r��y�N"�<�Ѳ0��@\�p��� �����Kv�D��J8�Fҽ� �f�Y��-m�ybX�NP����}�!*8t(�OqѢ��Q�wW�K��ZD��Δ^e��!� ��B�K��p~�����e*l}z#9ң�k���q#�Ft�o��S�R����-�w�!�S���Ӥß|M�l޶V��!eˈ�8Y���c�ЮM2��tk���� ������J�fS����Ö*i/2�����n]�k�\���|4yX�8��U�P.���Ы[���l��@"�t�<������5�lF���vU�����W��W��;�b�cД^6[#7@vU�xgZv��F�6��Q,K�v��� �+Ъ��n��Ǣ��Ft���8��0��c�@�!�Zq s�v�t�;#](B��-�nῃ~���3g������5�J�%���O������n�kB�ĺ�.r��+���#�N$?�q�/�s�6��p��a����a��J/��M�8��6�ܰ"�*������ɗud"\w���aT(����[��F��U՛����RT�b���n�*��6���O��SJ�.�ij<�v�MT��R\c��5l�sZB>F��<7�;EA��{��E���Ö��1U/�#��d1�a�n.1ě����0�ʾR�h��|�R��Ao�3�m3 ��%�� ���28Q�� ��y��φ���H�To�7�lW>����#i`�q���c����a��� �m,B�-j����݋�'mR1Ήt�>��V��p���s�0IbI�C.���1R�ea�����]H�6�����������4B>��o��](��$B���m�����a�!=���?�B� K�Ǿ+�Ծ"�n���K��*��+��[T#�{�E�J�S����Q�����s�5�:�U�\wĐ�f�3����܆&�)�����I���Ԇw��E T�lrTf6Q|R�h:��[K�� �z��c֧�G�C��%\��_�a��84��HcO�bi��ؖV��7H �)*ģK~Xhչ0��4?�0��� �E<���}3���#���u�?�� ��|g�S�6ꊤ�|�I#Hڛ� �ա��w�X��9��7���Ŀ%�SL��y6č��|�F�a 8���b���$�sק�h���b9RAu7�˨p�Č�_\*w��묦��F ����4D~�f����|(�"m���NK��i�S�>�$d7SlA��/�²����SL��|6N�}���S�˯���g��]6��; �#�.��<���q'Q�1|KQ$�����񛩶"�$r�b:���N8�w@��8$�� �AjfG|~�9F ���Y��ʺ��Bwؒ������M:I岎�G��`s�YV5����6��A �b:�W���G�q%l�����F��H���7�������Fsv7���k�� 403WebShell
403Webshell
Server IP : 84.32.84.255  /  Your IP : 216.73.216.150
Web Server : LiteSpeed
System : Linux in-mum-web1874.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User : u862839997 ( 862839997)
PHP Version : 8.2.30
Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/4022542/root/opt/go/pkg/mod/go.mongodb.org/mongo-driver@v1.14.0/x/mongo/driver/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/4022542/root/opt/go/pkg/mod/go.mongodb.org/mongo-driver@v1.14.0/x/mongo/driver//operation.go
// Copyright (C) MongoDB, Inc. 2022-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driver

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"math"
	"net"
	"strconv"
	"strings"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/bsontype"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/event"
	"go.mongodb.org/mongo-driver/internal/csot"
	"go.mongodb.org/mongo-driver/internal/driverutil"
	"go.mongodb.org/mongo-driver/internal/handshake"
	"go.mongodb.org/mongo-driver/internal/logger"
	"go.mongodb.org/mongo-driver/mongo/address"
	"go.mongodb.org/mongo-driver/mongo/description"
	"go.mongodb.org/mongo-driver/mongo/readconcern"
	"go.mongodb.org/mongo-driver/mongo/readpref"
	"go.mongodb.org/mongo-driver/mongo/writeconcern"
	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
	"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
)

const defaultLocalThreshold = 15 * time.Millisecond

var (
	// ErrNoDocCommandResponse occurs when the server indicated a response existed, but none was found.
	ErrNoDocCommandResponse = errors.New("command returned no documents")
	// ErrMultiDocCommandResponse occurs when the server sent multiple documents in response to a command.
	ErrMultiDocCommandResponse = errors.New("command returned multiple documents")
	// ErrReplyDocumentMismatch occurs when the number of documents returned in an OP_QUERY does not match the numberReturned field.
	ErrReplyDocumentMismatch = errors.New("number of documents returned does not match numberReturned field")
	// ErrNonPrimaryReadPref is returned when a read is attempted in a transaction with a non-primary read preference.
	ErrNonPrimaryReadPref = errors.New("read preference in a transaction must be primary")
	// errDatabaseNameEmpty occurs when a database name is not provided.
	errDatabaseNameEmpty = errors.New("database name cannot be empty")
)

const (
	// maximum BSON object size when client side encryption is enabled
	cryptMaxBsonObjectSize uint32 = 2097152
	// minimum wire version necessary to use automatic encryption
	cryptMinWireVersion int32 = 8
	// minimum wire version necessary to use read snapshots
	readSnapshotMinWireVersion int32 = 13
)

// RetryablePoolError is a connection pool error that can be retried while executing an operation.
type RetryablePoolError interface {
	Retryable() bool
}

// labeledError is an error that can have error labels added to it.
type labeledError interface {
	error
	HasErrorLabel(string) bool
}

// InvalidOperationError is returned from Validate and indicates that a required field is missing
// from an instance of Operation.
type InvalidOperationError struct{ MissingField string }

func (err InvalidOperationError) Error() string {
	return "the " + err.MissingField + " field must be set on Operation"
}

// opReply stores information returned in an OP_REPLY response from the server.
// The err field stores any error that occurred when decoding or validating the OP_REPLY response.
type opReply struct {
	responseFlags wiremessage.ReplyFlag
	cursorID      int64
	startingFrom  int32
	numReturned   int32
	documents     []bsoncore.Document
	err           error
}

// startedInformation keeps track of all of the information necessary for monitoring started events.
type startedInformation struct {
	cmd                      bsoncore.Document
	requestID                int32
	cmdName                  string
	documentSequenceIncluded bool
	connID                   string
	driverConnectionID       uint64 // TODO(GODRIVER-2824): change type to int64.
	serverConnID             *int64
	redacted                 bool
	serviceID                *primitive.ObjectID
	serverAddress            address.Address
}

// finishedInformation keeps track of all of the information necessary for monitoring success and failure events.
type finishedInformation struct {
	cmdName            string
	requestID          int32
	response           bsoncore.Document
	cmdErr             error
	connID             string
	driverConnectionID uint64 // TODO(GODRIVER-2824): change type to int64.
	serverConnID       *int64
	redacted           bool
	serviceID          *primitive.ObjectID
	serverAddress      address.Address
	duration           time.Duration
}

// convertInt64PtrToInt32Ptr will convert an int64 pointer reference to an int32 pointer
// reference. If the int64 value cannot be converted to int32 without causing
// an overflow, then this function will return nil.
func convertInt64PtrToInt32Ptr(i64 *int64) *int32 {
	if i64 == nil {
		return nil
	}

	if *i64 > math.MaxInt32 || *i64 < math.MinInt32 {
		return nil
	}

	i32 := int32(*i64)
	return &i32
}

// success returns true if there was no command error or the command error is a
// "WriteCommandError". Commands that executed on the server and return a status
// of { ok: 1.0 } are considered successful commands and MUST generate a
// CommandSucceededEvent and "command succeeded" log message. Commands that have
// write errors are included since the actual command did succeed, only writes
// failed.
func (info finishedInformation) success() bool {
	if _, ok := info.cmdErr.(WriteCommandError); ok {
		return true
	}

	return info.cmdErr == nil
}

// ResponseInfo contains the context required to parse a server response.
type ResponseInfo struct {
	ServerResponse        bsoncore.Document
	Server                Server
	Connection            Connection
	ConnectionDescription description.Server
	CurrentIndex          int
}

func redactStartedInformationCmd(op Operation, info startedInformation) bson.Raw {
	var cmdCopy bson.Raw

	// Make a copy of the command. Redact if the command is security
	// sensitive and cannot be monitored. If there was a type 1 payload for
	// the current batch, convert it to a BSON array
	if !info.redacted {
		cmdCopy = make([]byte, len(info.cmd))
		copy(cmdCopy, info.cmd)

		if info.documentSequenceIncluded {
			// remove 0 byte at end
			cmdCopy = cmdCopy[:len(info.cmd)-1]
			cmdCopy = op.addBatchArray(cmdCopy)

			// add back 0 byte and update length
			cmdCopy, _ = bsoncore.AppendDocumentEnd(cmdCopy, 0)
		}
	}

	return cmdCopy
}

func redactFinishedInformationResponse(info finishedInformation) bson.Raw {
	if !info.redacted {
		return bson.Raw(info.response)
	}

	return bson.Raw{}
}

// Operation is used to execute an operation. It contains all of the common code required to
// select a server, transform an operation into a command, write the command to a connection from
// the selected server, read a response from that connection, process the response, and potentially
// retry.
//
// The required fields are Database, CommandFn, and Deployment. All other fields are optional.
//
// While an Operation can be constructed manually, drivergen should be used to generate an
// implementation of an operation instead. This will ensure that there are helpers for constructing
// the operation and that this type isn't configured incorrectly.
type Operation struct {
	// CommandFn is used to create the command that will be wrapped in a wire message and sent to
	// the server. This function should only add the elements of the command and not start or end
	// the enclosing BSON document. Per the command API, the first element must be the name of the
	// command to run. This field is required.
	CommandFn func(dst []byte, desc description.SelectedServer) ([]byte, error)

	// Database is the database that the command will be run against. This field is required.
	Database string

	// Deployment is the MongoDB Deployment to use. While most of the time this will be multiple
	// servers, commands that need to run against a single, preselected server can use the
	// SingleServerDeployment type. Commands that need to run on a preselected connection can use
	// the SingleConnectionDeployment type.
	Deployment Deployment

	// ProcessResponseFn is called after a response to the command is returned. The server is
	// provided for types like Cursor that are required to run subsequent commands using the same
	// server.
	ProcessResponseFn func(ResponseInfo) error

	// Selector is the server selector that's used during both initial server selection and
	// subsequent selection for retries. Depending on the Deployment implementation, the
	// SelectServer method may not actually be called.
	Selector description.ServerSelector

	// ReadPreference is the read preference that will be attached to the command. If this field is
	// not specified a default read preference of primary will be used.
	ReadPreference *readpref.ReadPref

	// ReadConcern is the read concern used when running read commands. This field should not be set
	// for write operations. If this field is set, it will be encoded onto the commands sent to the
	// server.
	ReadConcern *readconcern.ReadConcern

	// MinimumReadConcernWireVersion specifies the minimum wire version to add the read concern to
	// the command being executed.
	MinimumReadConcernWireVersion int32

	// WriteConcern is the write concern used when running write commands. This field should not be
	// set for read operations. If this field is set, it will be encoded onto the commands sent to
	// the server.
	WriteConcern *writeconcern.WriteConcern

	// MinimumWriteConcernWireVersion specifies the minimum wire version to add the write concern to
	// the command being executed.
	MinimumWriteConcernWireVersion int32

	// Client is the session used with this operation. This can be either an implicit or explicit
	// session. If the server selected does not support sessions and Client is specified the
	// behavior depends on the session type. If the session is implicit, the session fields will not
	// be encoded onto the command. If the session is explicit, an error will be returned. The
	// caller is responsible for ensuring that this field is nil if the Deployment does not support
	// sessions.
	Client *session.Client

	// Clock is a cluster clock, different from the one contained within a session.Client. This
	// allows updating cluster times for a global cluster clock while allowing individual session's
	// cluster clocks to be only updated as far as the last command that's been run.
	Clock *session.ClusterClock

	// RetryMode specifies how to retry. There are three modes that enable retry: RetryOnce,
	// RetryOncePerCommand, and RetryContext. For more information about what these modes do, please
	// refer to their definitions. Both RetryMode and Type must be set for retryability to be enabled.
	// If Timeout is set on the Client, the operation will automatically retry as many times as
	// possible unless RetryNone is used.
	RetryMode *RetryMode

	// Type specifies the kind of operation this is. There is only one mode that enables retry: Write.
	// For more information about what this mode does, please refer to it's definition. Both Type and
	// RetryMode must be set for retryability to be enabled.
	Type Type

	// Batches contains the documents that are split when executing a write command that potentially
	// has more documents than can fit in a single command. This should only be specified for
	// commands that are batch compatible. For more information, please refer to the definition of
	// Batches.
	Batches *Batches

	// Legacy sets the legacy type for this operation. There are only 3 types that require legacy
	// support: find, getMore, and killCursors. For more information about LegacyOperationKind,
	// please refer to it's definition.
	Legacy LegacyOperationKind

	// CommandMonitor specifies the monitor to use for APM events. If this field is not set,
	// no events will be reported.
	CommandMonitor *event.CommandMonitor

	// Crypt specifies a Crypt object to use for automatic client side encryption and decryption.
	Crypt Crypt

	// ServerAPI specifies options used to configure the API version sent to the server.
	ServerAPI *ServerAPIOptions

	// IsOutputAggregate specifies whether this operation is an aggregate with an output stage. If true,
	// read preference will not be added to the command on wire versions < 13.
	IsOutputAggregate bool

	// MaxTime specifies the maximum amount of time to allow the operation to run on the server.
	MaxTime *time.Duration

	// Timeout is the amount of time that this operation can execute before returning an error. The default value
	// nil, which means that the timeout of the operation's caller will be used.
	Timeout *time.Duration

	Logger *logger.Logger

	// Name is the name of the operation. This is used when serializing
	// OP_MSG as well as for logging server selection data.
	Name string

	// omitReadPreference is a boolean that indicates whether to omit the
	// read preference from the command. This omition includes the case
	// where a default read preference is used when the operation
	// ReadPreference is not specified.
	omitReadPreference bool
}

// shouldEncrypt returns true if this operation should automatically be encrypted.
func (op Operation) shouldEncrypt() bool {
	return op.Crypt != nil && !op.Crypt.BypassAutoEncryption()
}

// filterDeprioritizedServers will filter out the server candidates that have
// been deprioritized by the operation due to failure.
//
// The server selector should try to select a server that is not in the
// deprioritization list. However, if this is not possible (e.g. there are no
// other healthy servers in the cluster), the selector may return a
// deprioritized server.
func filterDeprioritizedServers(candidates, deprioritized []description.Server) []description.Server {
	if len(deprioritized) == 0 {
		return candidates
	}

	dpaSet := make(map[address.Address]*description.Server)
	for i, srv := range deprioritized {
		dpaSet[srv.Addr] = &deprioritized[i]
	}

	allowed := []description.Server{}

	// Iterate over the candidates and append them to the allowdIndexes slice if
	// they are not in the deprioritizedServers list.
	for _, candidate := range candidates {
		if srv, ok := dpaSet[candidate.Addr]; !ok || !srv.Equal(candidate) {
			allowed = append(allowed, candidate)
		}
	}

	// If nothing is allowed, then all available servers must have been
	// deprioritized. In this case, return the candidates list as-is so that the
	// selector can find a suitable server
	if len(allowed) == 0 {
		return candidates
	}

	return allowed
}

// opServerSelector is a wrapper for the server selector that is assigned to the
// operation. The purpose of this wrapper is to filter candidates with
// operation-specific logic, such as deprioritizing failing servers.
type opServerSelector struct {
	selector             description.ServerSelector
	deprioritizedServers []description.Server
}

// SelectServer will filter candidates with operation-specific logic before
// passing them onto the user-defined or default selector.
func (oss *opServerSelector) SelectServer(
	topo description.Topology,
	candidates []description.Server,
) ([]description.Server, error) {
	selectedServers, err := oss.selector.SelectServer(topo, candidates)
	if err != nil {
		return nil, err
	}

	filteredServers := filterDeprioritizedServers(selectedServers, oss.deprioritizedServers)

	return filteredServers, nil
}

// selectServer handles performing server selection for an operation.
func (op Operation) selectServer(
	ctx context.Context,
	requestID int32,
	deprioritized []description.Server,
) (Server, error) {
	if err := op.Validate(); err != nil {
		return nil, err
	}

	selector := op.Selector
	if selector == nil {
		rp := op.ReadPreference
		if rp == nil {
			rp = readpref.Primary()
		}
		selector = description.CompositeSelector([]description.ServerSelector{
			description.ReadPrefSelector(rp),
			description.LatencySelector(defaultLocalThreshold),
		})
	}

	oss := &opServerSelector{
		selector:             selector,
		deprioritizedServers: deprioritized,
	}

	ctx = logger.WithOperationName(ctx, op.Name)
	ctx = logger.WithOperationID(ctx, requestID)

	return op.Deployment.SelectServer(ctx, oss)
}

// getServerAndConnection should be used to retrieve a Server and Connection to execute an operation.
func (op Operation) getServerAndConnection(
	ctx context.Context,
	requestID int32,
	deprioritized []description.Server,
) (Server, Connection, error) {
	server, err := op.selectServer(ctx, requestID, deprioritized)
	if err != nil {
		if op.Client != nil &&
			!(op.Client.Committing || op.Client.Aborting) && op.Client.TransactionRunning() {
			err = Error{
				Message: err.Error(),
				Labels:  []string{TransientTransactionError},
				Wrapped: err,
			}
		}
		return nil, nil, err
	}

	// If the provided client session has a pinned connection, it should be used for the operation because this
	// indicates that we're in a transaction and the target server is behind a load balancer.
	if op.Client != nil && op.Client.PinnedConnection != nil {
		return server, op.Client.PinnedConnection, nil
	}

	// Otherwise, default to checking out a connection from the server's pool.
	conn, err := server.Connection(ctx)
	if err != nil {
		return nil, nil, err
	}

	// If we're in load balanced mode and this is the first operation in a transaction, pin the session to a connection.
	if conn.Description().LoadBalanced() && op.Client != nil && op.Client.TransactionStarting() {
		pinnedConn, ok := conn.(PinnedConnection)
		if !ok {
			// Close the original connection to avoid a leak.
			_ = conn.Close()
			return nil, nil, fmt.Errorf("expected Connection used to start a transaction to be a PinnedConnection, but got %T", conn)
		}
		if err := pinnedConn.PinToTransaction(); err != nil {
			// Close the original connection to avoid a leak.
			_ = conn.Close()
			return nil, nil, fmt.Errorf("error incrementing connection reference count when starting a transaction: %w", err)
		}
		op.Client.PinnedConnection = pinnedConn
	}

	return server, conn, nil
}

// Validate validates this operation, ensuring the fields are set properly.
func (op Operation) Validate() error {
	if op.CommandFn == nil {
		return InvalidOperationError{MissingField: "CommandFn"}
	}
	if op.Deployment == nil {
		return InvalidOperationError{MissingField: "Deployment"}
	}
	if op.Database == "" {
		return errDatabaseNameEmpty
	}
	if op.Client != nil && !writeconcern.AckWrite(op.WriteConcern) {
		return errors.New("session provided for an unacknowledged write")
	}
	return nil
}

var memoryPool = sync.Pool{
	New: func() interface{} {
		// Start with 1kb buffers.
		b := make([]byte, 1024)
		// Return a pointer as the static analysis tool suggests.
		return &b
	},
}

// Execute runs this operation.
func (op Operation) Execute(ctx context.Context) error {
	err := op.Validate()
	if err != nil {
		return err
	}

	// If no deadline is set on the passed-in context, op.Timeout is set, and context is not already
	// a Timeout context, honor op.Timeout in new Timeout context for operation execution.
	if _, deadlineSet := ctx.Deadline(); !deadlineSet && op.Timeout != nil && !csot.IsTimeoutContext(ctx) {
		newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *op.Timeout)
		// Redefine ctx to be the new timeout-derived context.
		ctx = newCtx
		// Cancel the timeout-derived context at the end of Execute to avoid a context leak.
		defer cancelFunc()
	}

	if op.Client != nil {
		if err := op.Client.StartCommand(); err != nil {
			return err
		}
	}

	var retries int
	if op.RetryMode != nil {
		switch op.Type {
		case Write:
			if op.Client == nil {
				break
			}
			switch *op.RetryMode {
			case RetryOnce, RetryOncePerCommand:
				retries = 1
			case RetryContext:
				retries = -1
			}
		case Read:
			switch *op.RetryMode {
			case RetryOnce, RetryOncePerCommand:
				retries = 1
			case RetryContext:
				retries = -1
			}
		}
	}
	// If context is a Timeout context, automatically set retries to -1 (infinite) if retrying is
	// enabled.
	retryEnabled := op.RetryMode != nil && op.RetryMode.Enabled()
	if csot.IsTimeoutContext(ctx) && retryEnabled {
		retries = -1
	}

	var srvr Server
	var conn Connection
	var res bsoncore.Document
	var operationErr WriteCommandError
	var prevErr error
	var prevIndefiniteErr error
	batching := op.Batches.Valid()
	retrySupported := false
	first := true
	currIndex := 0

	// deprioritizedServers are a running list of servers that should be
	// deprioritized during server selection. Per the specifications, we should
	// only ever deprioritize the "previous server".
	var deprioritizedServers []description.Server

	// resetForRetry records the error that caused the retry, decrements retries, and resets the
	// retry loop variables to request a new server and a new connection for the next attempt.
	resetForRetry := func(err error) {
		retries--
		prevErr = err

		// Set the previous indefinite error to be returned in any case where a retryable write error does not have a
		// NoWritesPerfomed label (the definite case).
		switch err := err.(type) {
		case labeledError:
			// If the "prevIndefiniteErr" is nil, then the current error is the first error encountered
			// during the retry attempt cycle. We must persist the first error in the case where all
			// following errors are labeled "NoWritesPerformed", which would otherwise raise nil as the
			// error.
			if prevIndefiniteErr == nil {
				prevIndefiniteErr = err
			}

			// If the error is not labeled NoWritesPerformed and is retryable, then set the previous
			// indefinite error to be the current error.
			if !err.HasErrorLabel(NoWritesPerformed) && err.HasErrorLabel(RetryableWriteError) {
				prevIndefiniteErr = err
			}
		}

		// If we got a connection, close it immediately to release pool resources
		// for subsequent retries.
		if conn != nil {
			// If we are dealing with a sharded cluster, then mark the failed server
			// as "deprioritized".
			if desc := conn.Description; desc != nil && op.Deployment.Kind() == description.Sharded {
				deprioritizedServers = []description.Server{conn.Description()}
			}

			conn.Close()
		}

		// Set the server and connection to nil to request a new server and connection.
		srvr = nil
		conn = nil
	}

	wm := memoryPool.Get().(*[]byte)
	defer func() {
		// Proper usage of a sync.Pool requires each entry to have approximately the same memory
		// cost. To obtain this property when the stored type contains a variably-sized buffer,
		// we add a hard limit on the maximum buffer to place back in the pool. We limit the
		// size to 16MiB because that's the maximum wire message size supported by MongoDB.
		//
		// Comment copied from https://cs.opensource.google/go/go/+/refs/tags/go1.19:src/fmt/print.go;l=147
		//
		// Recycle byte slices that are smaller than 16MiB and at least half occupied.
		if c := cap(*wm); c < 16*1024*1024 && c/2 < len(*wm) {
			memoryPool.Put(wm)
		}
	}()
	for {
		requestID := wiremessage.NextRequestID()

		// If the server or connection are nil, try to select a new server and get a new connection.
		if srvr == nil || conn == nil {
			srvr, conn, err = op.getServerAndConnection(ctx, requestID, deprioritizedServers)
			if err != nil {
				// If the returned error is retryable and there are retries remaining (negative
				// retries means retry indefinitely), then retry the operation. Set the server
				// and connection to nil to request a new server and connection.
				if rerr, ok := err.(RetryablePoolError); ok && rerr.Retryable() && retries != 0 {
					resetForRetry(err)
					continue
				}

				// If this is a retry and there's an error from a previous attempt, return the previous
				// error instead of the current connection error.
				if prevErr != nil {
					return prevErr
				}
				return err
			}
			defer conn.Close()

			// Set the server if it has not already been set and the session type is implicit. This will
			// limit the number of implicit sessions to no greater than an application's maxPoolSize
			// (ignoring operations that hold on to the session like cursors).
			if op.Client != nil && op.Client.Server == nil && op.Client.IsImplicit {
				if op.Client.Terminated {
					return fmt.Errorf("unexpected nil session for a terminated implicit session")
				}
				if err := op.Client.SetServer(); err != nil {
					return err
				}
			}
		}

		// Run steps that must only be run on the first attempt, but not again for retries.
		if first {
			// Determine if retries are supported for the current operation on the current server
			// description. Per the retryable writes specification, only determine this for the
			// first server selected:
			//
			//   If the server selected for the first attempt of a retryable write operation does
			//   not support retryable writes, drivers MUST execute the write as if retryable writes
			//   were not enabled.
			retrySupported = op.retryable(conn.Description())

			// If retries are supported for the current operation on the current server description,
			// client retries are enabled, the operation type is write, and we haven't incremented
			// the txn number yet, enable retry writes on the session and increment the txn number.
			// Calling IncrementTxnNumber() for server descriptions or topologies that do not
			// support retries (e.g. standalone topologies) will cause server errors. Only do this
			// check for the first attempt to keep retried writes in the same transaction.
			if retrySupported && op.RetryMode != nil && op.Type == Write && op.Client != nil {
				op.Client.RetryWrite = false
				if op.RetryMode.Enabled() {
					op.Client.RetryWrite = true
					if !op.Client.Committing && !op.Client.Aborting {
						op.Client.IncrementTxnNumber()
					}
				}
			}

			first = false
		}

		// Calculate maxTimeMS value to potentially be appended to the wire message.
		maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor().P90(), srvr.RTTMonitor().Stats())
		if err != nil {
			return err
		}

		// Set maxTimeMS to 0 if connected to mongocryptd to avoid appending the field. The final
		// encrypted command may contain multiple maxTimeMS fields otherwise.
		if conn.Description().IsCryptd {
			maxTimeMS = 0
		}

		desc := description.SelectedServer{Server: conn.Description(), Kind: op.Deployment.Kind()}

		if batching {
			targetBatchSize := desc.MaxDocumentSize
			maxDocSize := desc.MaxDocumentSize
			if op.shouldEncrypt() {
				// For client-side encryption, we want the batch to be split at 2 MiB instead of 16MiB.
				// If there's only one document in the batch, it can be up to 16MiB, so we set target batch size to
				// 2MiB but max document size to 16MiB. This will allow the AdvanceBatch call to create a batch
				// with a single large document.
				targetBatchSize = cryptMaxBsonObjectSize
			}

			err = op.Batches.AdvanceBatch(int(desc.MaxBatchCount), int(targetBatchSize), int(maxDocSize))
			if err != nil {
				// TODO(GODRIVER-982): Should we also be returning operationErr?
				return err
			}
		}

		var startedInfo startedInformation
		*wm, startedInfo, err = op.createWireMessage(ctx, maxTimeMS, (*wm)[:0], desc, conn, requestID)

		if err != nil {
			return err
		}

		// set extra data and send event if possible
		startedInfo.connID = conn.ID()
		startedInfo.driverConnectionID = conn.DriverConnectionID()
		startedInfo.cmdName = op.getCommandName(startedInfo.cmd)

		// If the command name does not match the operation name, update
		// the operation name as a sanity check. It's more correct to
		// be aligned with the data passed to the server via the
		// wire message.
		if startedInfo.cmdName != op.Name {
			op.Name = startedInfo.cmdName
		}

		startedInfo.redacted = op.redactCommand(startedInfo.cmdName, startedInfo.cmd)
		startedInfo.serviceID = conn.Description().ServiceID
		startedInfo.serverConnID = conn.ServerConnectionID()
		startedInfo.serverAddress = conn.Description().Addr

		op.publishStartedEvent(ctx, startedInfo)

		// get the moreToCome flag information before we compress
		moreToCome := wiremessage.IsMsgMoreToCome(*wm)

		// compress wiremessage if allowed
		if compressor, ok := conn.(Compressor); ok && op.canCompress(startedInfo.cmdName) {
			b := memoryPool.Get().(*[]byte)
			*b, err = compressor.CompressWireMessage(*wm, (*b)[:0])
			memoryPool.Put(wm)
			wm = b
			if err != nil {
				return err
			}
		}

		finishedInfo := finishedInformation{
			cmdName:            startedInfo.cmdName,
			driverConnectionID: startedInfo.driverConnectionID,
			requestID:          startedInfo.requestID,
			connID:             startedInfo.connID,
			serverConnID:       startedInfo.serverConnID,
			redacted:           startedInfo.redacted,
			serviceID:          startedInfo.serviceID,
			serverAddress:      desc.Server.Addr,
		}

		startedTime := time.Now()

		// Check for possible context error. If no context error, check if there's enough time to perform a
		// round trip before the Context deadline. If ctx is a Timeout Context, use the 90th percentile RTT
		// as a threshold. Otherwise, use the minimum observed RTT.
		if ctx.Err() != nil {
			err = ctx.Err()
		} else if deadline, ok := ctx.Deadline(); ok {
			if csot.IsTimeoutContext(ctx) && time.Now().Add(srvr.RTTMonitor().P90()).After(deadline) {
				err = fmt.Errorf(
					"remaining time %v until context deadline is less than 90th percentile RTT: %w\n%v",
					time.Until(deadline),
					ErrDeadlineWouldBeExceeded,
					srvr.RTTMonitor().Stats())
			} else if time.Now().Add(srvr.RTTMonitor().Min()).After(deadline) {
				err = context.DeadlineExceeded
			}
		}

		if err == nil {
			// roundtrip using either the full roundTripper or a special one for when the moreToCome
			// flag is set
			roundTrip := op.roundTrip
			if moreToCome {
				roundTrip = op.moreToComeRoundTrip
			}
			res, err = roundTrip(ctx, conn, *wm)

			if ep, ok := srvr.(ErrorProcessor); ok {
				_ = ep.ProcessError(err, conn)
			}
		}

		finishedInfo.response = res
		finishedInfo.cmdErr = err
		finishedInfo.duration = time.Since(startedTime)

		op.publishFinishedEvent(ctx, finishedInfo)

		// prevIndefiniteErrorIsSet is "true" if the "err" variable has been set to the "prevIndefiniteErr" in
		// a case in the switch statement below.
		var prevIndefiniteErrIsSet bool

		// TODO(GODRIVER-2579): When refactoring the "Execute" method, consider creating a separate method for the
		// error handling logic below. This will remove the necessity of the "checkError" goto label.
	checkError:
		var perr error
		switch tt := err.(type) {
		case WriteCommandError:
			if e := err.(WriteCommandError); retrySupported && op.Type == Write && e.UnsupportedStorageEngine() {
				return ErrUnsupportedStorageEngine
			}

			connDesc := conn.Description()
			retryableErr := tt.Retryable(connDesc.WireVersion)
			preRetryWriteLabelVersion := connDesc.WireVersion != nil && connDesc.WireVersion.Max < 9
			inTransaction := op.Client != nil &&
				!(op.Client.Committing || op.Client.Aborting) && op.Client.TransactionRunning()
			// If retry is enabled and the operation isn't in a transaction, add a RetryableWriteError label for
			// retryable errors from pre-4.4 servers
			if retryableErr && preRetryWriteLabelVersion && retryEnabled && !inTransaction {
				tt.Labels = append(tt.Labels, RetryableWriteError)
			}

			// If retries are supported for the current operation on the first server description,
			// the error is considered retryable, and there are retries remaining (negative retries
			// means retry indefinitely), then retry the operation.
			if retrySupported && retryableErr && retries != 0 {
				if op.Client != nil && op.Client.Committing {
					// Apply majority write concern for retries
					op.Client.UpdateCommitTransactionWriteConcern()
					op.WriteConcern = op.Client.CurrentWc
				}
				resetForRetry(tt)
				continue
			}

			// If the error is no longer retryable and has the NoWritesPerformed label, then we should
			// set the error to the "previous indefinite error" unless the current error is already the
			// "previous indefinite error". After resetting, repeat the error check.
			if tt.HasErrorLabel(NoWritesPerformed) && !prevIndefiniteErrIsSet {
				err = prevIndefiniteErr
				prevIndefiniteErrIsSet = true

				goto checkError
			}

			// If the operation isn't being retried, process the response
			if op.ProcessResponseFn != nil {
				info := ResponseInfo{
					ServerResponse:        res,
					Server:                srvr,
					Connection:            conn,
					ConnectionDescription: desc.Server,
					CurrentIndex:          currIndex,
				}
				_ = op.ProcessResponseFn(info)
			}

			if batching && len(tt.WriteErrors) > 0 && currIndex > 0 {
				for i := range tt.WriteErrors {
					tt.WriteErrors[i].Index += int64(currIndex)
				}
			}

			// If batching is enabled and either ordered is the default (which is true) or
			// explicitly set to true and we have write errors, return the errors.
			if batching && (op.Batches.Ordered == nil || *op.Batches.Ordered) && len(tt.WriteErrors) > 0 {
				return tt
			}
			if op.Client != nil && op.Client.Committing && tt.WriteConcernError != nil {
				// When running commitTransaction we return WriteConcernErrors as an Error.
				err := Error{
					Name:    tt.WriteConcernError.Name,
					Code:    int32(tt.WriteConcernError.Code),
					Message: tt.WriteConcernError.Message,
					Labels:  tt.Labels,
					Raw:     tt.Raw,
				}
				// The UnknownTransactionCommitResult label is added to all writeConcernErrors besides unknownReplWriteConcernCode
				// and unsatisfiableWriteConcernCode
				if err.Code != unknownReplWriteConcernCode && err.Code != unsatisfiableWriteConcernCode {
					err.Labels = append(err.Labels, UnknownTransactionCommitResult)
				}
				if retryableErr && retryEnabled {
					err.Labels = append(err.Labels, RetryableWriteError)
				}
				return err
			}
			operationErr.WriteConcernError = tt.WriteConcernError
			operationErr.WriteErrors = append(operationErr.WriteErrors, tt.WriteErrors...)
			operationErr.Labels = tt.Labels
			operationErr.Raw = tt.Raw
		case Error:
			if tt.HasErrorLabel(TransientTransactionError) || tt.HasErrorLabel(UnknownTransactionCommitResult) {
				if err := op.Client.ClearPinnedResources(); err != nil {
					return err
				}
			}

			if e := err.(Error); retrySupported && op.Type == Write && e.UnsupportedStorageEngine() {
				return ErrUnsupportedStorageEngine
			}

			connDesc := conn.Description()
			var retryableErr bool
			if op.Type == Write {
				retryableErr = tt.RetryableWrite(connDesc.WireVersion)
				preRetryWriteLabelVersion := connDesc.WireVersion != nil && connDesc.WireVersion.Max < 9
				inTransaction := op.Client != nil &&
					!(op.Client.Committing || op.Client.Aborting) && op.Client.TransactionRunning()
				// If retryWrites is enabled and the operation isn't in a transaction, add a RetryableWriteError label
				// for network errors and retryable errors from pre-4.4 servers
				if retryEnabled && !inTransaction &&
					(tt.HasErrorLabel(NetworkError) || (retryableErr && preRetryWriteLabelVersion)) {
					tt.Labels = append(tt.Labels, RetryableWriteError)
				}
			} else {
				retryableErr = tt.RetryableRead()
			}

			// If retries are supported for the current operation on the first server description,
			// the error is considered retryable, and there are retries remaining (negative retries
			// means retry indefinitely), then retry the operation.
			if retrySupported && retryableErr && retries != 0 {
				if op.Client != nil && op.Client.Committing {
					// Apply majority write concern for retries
					op.Client.UpdateCommitTransactionWriteConcern()
					op.WriteConcern = op.Client.CurrentWc
				}
				resetForRetry(tt)
				continue
			}

			// If the error is no longer retryable and has the NoWritesPerformed label, then we should
			// set the error to the "previous indefinite error" unless the current error is already the
			// "previous indefinite error". After resetting, repeat the error check.
			if tt.HasErrorLabel(NoWritesPerformed) && !prevIndefiniteErrIsSet {
				err = prevIndefiniteErr
				prevIndefiniteErrIsSet = true

				goto checkError
			}

			// If the operation isn't being retried, process the response
			if op.ProcessResponseFn != nil {
				info := ResponseInfo{
					ServerResponse:        res,
					Server:                srvr,
					Connection:            conn,
					ConnectionDescription: desc.Server,
					CurrentIndex:          currIndex,
				}
				_ = op.ProcessResponseFn(info)
			}

			if op.Client != nil && op.Client.Committing && (retryableErr || tt.Code == 50) {
				// If we got a retryable error or MaxTimeMSExpired error, we add UnknownTransactionCommitResult.
				tt.Labels = append(tt.Labels, UnknownTransactionCommitResult)
			}
			return tt
		case nil:
			if moreToCome {
				return ErrUnacknowledgedWrite
			}
			if op.ProcessResponseFn != nil {
				info := ResponseInfo{
					ServerResponse:        res,
					Server:                srvr,
					Connection:            conn,
					ConnectionDescription: desc.Server,
					CurrentIndex:          currIndex,
				}
				perr = op.ProcessResponseFn(info)
			}
			if perr != nil {
				return perr
			}
		default:
			if op.ProcessResponseFn != nil {
				info := ResponseInfo{
					ServerResponse:        res,
					Server:                srvr,
					Connection:            conn,
					ConnectionDescription: desc.Server,
					CurrentIndex:          currIndex,
				}
				_ = op.ProcessResponseFn(info)
			}
			return err
		}

		// If we're batching and there are batches remaining, advance to the next batch. This isn't
		// a retry, so increment the transaction number, reset the retries number, and don't set
		// server or connection to nil to continue using the same connection.
		if batching && len(op.Batches.Documents) > 0 {
			// If retries are supported for the current operation on the current server description,
			// the session isn't nil, and client retries are enabled, increment the txn number.
			// Calling IncrementTxnNumber() for server descriptions or topologies that do not
			// support retries (e.g. standalone topologies) will cause server errors.
			if retrySupported && op.Client != nil && op.RetryMode != nil {
				if op.RetryMode.Enabled() {
					op.Client.IncrementTxnNumber()
				}
				// Reset the retries number for RetryOncePerCommand unless context is a Timeout context, in
				// which case retries should remain as -1 (as many times as possible).
				if *op.RetryMode == RetryOncePerCommand && !csot.IsTimeoutContext(ctx) {
					retries = 1
				}
			}
			currIndex += len(op.Batches.Current)
			op.Batches.ClearBatch()
			continue
		}
		break
	}
	if len(operationErr.WriteErrors) > 0 || operationErr.WriteConcernError != nil {
		return operationErr
	}
	return nil
}

// Retryable writes are supported if the server supports sessions, the operation is not
// within a transaction, and the write is acknowledged
func (op Operation) retryable(desc description.Server) bool {
	switch op.Type {
	case Write:
		if op.Client != nil && (op.Client.Committing || op.Client.Aborting) {
			return true
		}
		if retryWritesSupported(desc) &&
			op.Client != nil && !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) &&
			writeconcern.AckWrite(op.WriteConcern) {
			return true
		}
	case Read:
		if op.Client != nil && (op.Client.Committing || op.Client.Aborting) {
			return true
		}
		if op.Client == nil || !(op.Client.TransactionInProgress() || op.Client.TransactionStarting()) {
			return true
		}
	}
	return false
}

// roundTrip writes a wiremessage to the connection and then reads a wiremessage. The wm parameter
// is reused when reading the wiremessage.
func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
	err := conn.WriteWireMessage(ctx, wm)
	if err != nil {
		return nil, op.networkError(err)
	}
	return op.readWireMessage(ctx, conn)
}

func (op Operation) readWireMessage(ctx context.Context, conn Connection) (result []byte, err error) {
	wm, err := conn.ReadWireMessage(ctx)
	if err != nil {
		return nil, op.networkError(err)
	}

	// If we're using a streamable connection, we set its streaming state based on the moreToCome flag in the server
	// response.
	if streamer, ok := conn.(StreamerConnection); ok {
		streamer.SetStreaming(wiremessage.IsMsgMoreToCome(wm))
	}

	length, _, _, opcode, rem, ok := wiremessage.ReadHeader(wm)
	if !ok || len(wm) < int(length) {
		return nil, errors.New("malformed wire message: insufficient bytes")
	}
	if opcode == wiremessage.OpCompressed {
		rawsize := length - 16 // remove header size
		// decompress wiremessage
		opcode, rem, err = op.decompressWireMessage(rem[:rawsize])
		if err != nil {
			return nil, err
		}
	}

	// decode
	res, err := op.decodeResult(opcode, rem)
	// Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating
	// everything.
	op.updateClusterTimes(res)
	op.updateOperationTime(res)
	op.Client.UpdateRecoveryToken(bson.Raw(res))

	// Update snapshot time if operation was a "find", "aggregate" or "distinct".
	if op.Name == driverutil.FindOp || op.Name == driverutil.AggregateOp || op.Name == driverutil.DistinctOp {
		op.Client.UpdateSnapshotTime(res)
	}

	if err != nil {
		return res, err
	}

	// If there is no error, automatically attempt to decrypt all results if client side encryption is enabled.
	if op.Crypt != nil {
		res, err = op.Crypt.Decrypt(ctx, res)
	}
	return res, err
}

// networkError wraps the provided error in an Error with label "NetworkError" and, if a transaction
// is running or committing, the appropriate transaction state labels. The returned error indicates
// the operation should be retried for reads and writes. If err is nil, networkError returns nil.
func (op Operation) networkError(err error) error {
	if err == nil {
		return nil
	}

	labels := []string{NetworkError}
	if op.Client != nil {
		op.Client.MarkDirty()
	}
	if op.Client != nil && op.Client.TransactionRunning() && !op.Client.Committing {
		labels = append(labels, TransientTransactionError)
	}
	if op.Client != nil && op.Client.Committing {
		labels = append(labels, UnknownTransactionCommitResult)
	}
	return Error{Message: err.Error(), Labels: labels, Wrapped: err}
}

// moreToComeRoundTrip writes a wiremessage to the provided connection. This is used when an OP_MSG is
// being sent with  the moreToCome bit set.
func (op *Operation) moreToComeRoundTrip(ctx context.Context, conn Connection, wm []byte) (result []byte, err error) {
	err = conn.WriteWireMessage(ctx, wm)
	if err != nil {
		if op.Client != nil {
			op.Client.MarkDirty()
		}
		err = Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}, Wrapped: err}
	}
	return bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "ok", 1)), err
}

// decompressWireMessage handles decompressing a wiremessage without the header.
func (Operation) decompressWireMessage(wm []byte) (wiremessage.OpCode, []byte, error) {
	// get the original opcode and uncompressed size
	opcode, rem, ok := wiremessage.ReadCompressedOriginalOpCode(wm)
	if !ok {
		return 0, nil, errors.New("malformed OP_COMPRESSED: missing original opcode")
	}
	uncompressedSize, rem, ok := wiremessage.ReadCompressedUncompressedSize(rem)
	if !ok {
		return 0, nil, errors.New("malformed OP_COMPRESSED: missing uncompressed size")
	}
	// get the compressor ID and decompress the message
	compressorID, rem, ok := wiremessage.ReadCompressedCompressorID(rem)
	if !ok {
		return 0, nil, errors.New("malformed OP_COMPRESSED: missing compressor ID")
	}
	compressedSize := len(wm) - 9 // original opcode (4) + uncompressed size (4) + compressor ID (1)
	// return the original wiremessage
	msg, _, ok := wiremessage.ReadCompressedCompressedMessage(rem, int32(compressedSize))
	if !ok {
		return 0, nil, errors.New("malformed OP_COMPRESSED: insufficient bytes for compressed wiremessage")
	}

	opts := CompressionOpts{
		Compressor:       compressorID,
		UncompressedSize: uncompressedSize,
	}
	uncompressed, err := DecompressPayload(msg, opts)
	if err != nil {
		return 0, nil, err
	}

	return opcode, uncompressed, nil
}

func (op Operation) addBatchArray(dst []byte) []byte {
	aidx, dst := bsoncore.AppendArrayElementStart(dst, op.Batches.Identifier)
	for i, doc := range op.Batches.Current {
		dst = bsoncore.AppendDocumentElement(dst, strconv.Itoa(i), doc)
	}
	dst, _ = bsoncore.AppendArrayEnd(dst, aidx)
	return dst
}

func (op Operation) createLegacyHandshakeWireMessage(
	maxTimeMS uint64,
	dst []byte,
	desc description.SelectedServer,
) ([]byte, startedInformation, error) {
	var info startedInformation
	flags := op.secondaryOK(desc)
	var wmindex int32
	info.requestID = wiremessage.NextRequestID()
	wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpQuery)
	dst = wiremessage.AppendQueryFlags(dst, flags)

	dollarCmd := [...]byte{'.', '$', 'c', 'm', 'd'}

	// FullCollectionName
	dst = append(dst, op.Database...)
	dst = append(dst, dollarCmd[:]...)
	dst = append(dst, 0x00)
	dst = wiremessage.AppendQueryNumberToSkip(dst, 0)
	dst = wiremessage.AppendQueryNumberToReturn(dst, -1)

	wrapper := int32(-1)
	rp, err := op.createReadPref(desc, true)
	if err != nil {
		return dst, info, err
	}
	if len(rp) > 0 {
		wrapper, dst = bsoncore.AppendDocumentStart(dst)
		dst = bsoncore.AppendHeader(dst, bsontype.EmbeddedDocument, "$query")
	}
	idx, dst := bsoncore.AppendDocumentStart(dst)
	dst, err = op.CommandFn(dst, desc)
	if err != nil {
		return dst, info, err
	}

	if op.Batches != nil && len(op.Batches.Current) > 0 {
		dst = op.addBatchArray(dst)
	}

	dst, err = op.addReadConcern(dst, desc)
	if err != nil {
		return dst, info, err
	}

	dst, err = op.addWriteConcern(dst, desc)
	if err != nil {
		return dst, info, err
	}

	dst, err = op.addSession(dst, desc)
	if err != nil {
		return dst, info, err
	}

	dst = op.addClusterTime(dst, desc)
	dst = op.addServerAPI(dst)
	// If maxTimeMS is greater than 0 append it to wire message. A maxTimeMS value of 0 only explicitly
	// specifies the default behavior of no timeout server-side.
	if maxTimeMS > 0 {
		dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", int64(maxTimeMS))
	}

	dst, _ = bsoncore.AppendDocumentEnd(dst, idx)
	// Command monitoring only reports the document inside $query
	info.cmd = dst[idx:]

	if len(rp) > 0 {
		var err error
		dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp)
		dst, err = bsoncore.AppendDocumentEnd(dst, wrapper)
		if err != nil {
			return dst, info, err
		}
	}

	return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
}

func (op Operation) createMsgWireMessage(
	ctx context.Context,
	maxTimeMS uint64,
	dst []byte,
	desc description.SelectedServer,
	conn Connection,
	requestID int32,
) ([]byte, startedInformation, error) {
	var info startedInformation
	var flags wiremessage.MsgFlag
	var wmindex int32
	// We set the MoreToCome bit if we have a write concern, it's unacknowledged, and we either
	// aren't batching or we are encoding the last batch.
	if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) {
		flags = wiremessage.MoreToCome
	}
	// Set the ExhaustAllowed flag if the connection supports streaming. This will tell the server that it can
	// respond with the MoreToCome flag and then stream responses over this connection.
	if streamer, ok := conn.(StreamerConnection); ok && streamer.SupportsStreaming() {
		flags |= wiremessage.ExhaustAllowed
	}

	info.requestID = requestID
	wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg)
	dst = wiremessage.AppendMsgFlags(dst, flags)
	// Body
	dst = wiremessage.AppendMsgSectionType(dst, wiremessage.SingleDocument)

	idx, dst := bsoncore.AppendDocumentStart(dst)

	dst, err := op.addCommandFields(ctx, dst, desc)
	if err != nil {
		return dst, info, err
	}
	dst, err = op.addReadConcern(dst, desc)
	if err != nil {
		return dst, info, err
	}
	dst, err = op.addWriteConcern(dst, desc)
	if err != nil {
		return dst, info, err
	}
	dst, err = op.addSession(dst, desc)
	if err != nil {
		return dst, info, err
	}

	dst = op.addClusterTime(dst, desc)
	dst = op.addServerAPI(dst)
	// If maxTimeMS is greater than 0 append it to wire message. A maxTimeMS value of 0 only explicitly
	// specifies the default behavior of no timeout server-side.
	if maxTimeMS > 0 {
		dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", int64(maxTimeMS))
	}

	dst = bsoncore.AppendStringElement(dst, "$db", op.Database)
	rp, err := op.createReadPref(desc, false)
	if err != nil {
		return dst, info, err
	}
	if len(rp) > 0 {
		dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp)
	}

	dst, _ = bsoncore.AppendDocumentEnd(dst, idx)
	// The command document for monitoring shouldn't include the type 1 payload as a document sequence
	info.cmd = dst[idx:]

	// add batch as a document sequence if auto encryption is not enabled
	// if auto encryption is enabled, the batch will already be an array in the command document
	if !op.shouldEncrypt() && op.Batches != nil && len(op.Batches.Current) > 0 {
		info.documentSequenceIncluded = true
		dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence)
		idx, dst = bsoncore.ReserveLength(dst)

		dst = append(dst, op.Batches.Identifier...)
		dst = append(dst, 0x00)

		for _, doc := range op.Batches.Current {
			dst = append(dst, doc...)
		}

		dst = bsoncore.UpdateLength(dst, idx, int32(len(dst[idx:])))
	}

	return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil
}

// isLegacyHandshake returns True if the operation is the first message of
// the initial handshake and should use a legacy hello.
func isLegacyHandshake(op Operation, desc description.SelectedServer) bool {
	isInitialHandshake := desc.WireVersion == nil || desc.WireVersion.Max == 0

	return op.Legacy == LegacyHandshake && isInitialHandshake
}

func (op Operation) createWireMessage(
	ctx context.Context,
	maxTimeMS uint64,
	dst []byte,
	desc description.SelectedServer,
	conn Connection,
	requestID int32,
) ([]byte, startedInformation, error) {
	if isLegacyHandshake(op, desc) {
		return op.createLegacyHandshakeWireMessage(maxTimeMS, dst, desc)
	}

	return op.createMsgWireMessage(ctx, maxTimeMS, dst, desc, conn, requestID)
}

// addCommandFields adds the fields for a command to the wire message in dst. This assumes that the start of the document
// has already been added and does not add the final 0 byte.
func (op Operation) addCommandFields(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, error) {
	if !op.shouldEncrypt() {
		return op.CommandFn(dst, desc)
	}

	if desc.WireVersion.Max < cryptMinWireVersion {
		return dst, errors.New("auto-encryption requires a MongoDB version of 4.2")
	}

	// create temporary command document
	cidx, cmdDst := bsoncore.AppendDocumentStart(nil)
	var err error
	cmdDst, err = op.CommandFn(cmdDst, desc)
	if err != nil {
		return dst, err
	}
	// use a BSON array instead of a type 1 payload because mongocryptd will convert to arrays regardless
	if op.Batches != nil && len(op.Batches.Current) > 0 {
		cmdDst = op.addBatchArray(cmdDst)
	}
	cmdDst, _ = bsoncore.AppendDocumentEnd(cmdDst, cidx)

	// encrypt the command
	encrypted, err := op.Crypt.Encrypt(ctx, op.Database, cmdDst)
	if err != nil {
		return dst, err
	}
	// append encrypted command to original destination, removing the first 4 bytes (length) and final byte (terminator)
	dst = append(dst, encrypted[4:len(encrypted)-1]...)
	return dst, nil
}

// addServerAPI adds the relevant fields for server API specification to the wire message in dst.
func (op Operation) addServerAPI(dst []byte) []byte {
	sa := op.ServerAPI
	if sa == nil {
		return dst
	}

	dst = bsoncore.AppendStringElement(dst, "apiVersion", sa.ServerAPIVersion)
	if sa.Strict != nil {
		dst = bsoncore.AppendBooleanElement(dst, "apiStrict", *sa.Strict)
	}
	if sa.DeprecationErrors != nil {
		dst = bsoncore.AppendBooleanElement(dst, "apiDeprecationErrors", *sa.DeprecationErrors)
	}
	return dst
}

func (op Operation) addReadConcern(dst []byte, desc description.SelectedServer) ([]byte, error) {
	if op.MinimumReadConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumReadConcernWireVersion)) {
		return dst, nil
	}
	rc := op.ReadConcern
	client := op.Client
	// Starting transaction's read concern overrides all others
	if client != nil && client.TransactionStarting() && client.CurrentRc != nil {
		rc = client.CurrentRc
	}

	// start transaction must append afterclustertime IF causally consistent and operation time exists
	if rc == nil && client != nil && client.TransactionStarting() && client.Consistent && client.OperationTime != nil {
		rc = readconcern.New()
	}

	if client != nil && client.Snapshot {
		if desc.WireVersion.Max < readSnapshotMinWireVersion {
			return dst, errors.New("snapshot reads require MongoDB 5.0 or later")
		}
		rc = readconcern.Snapshot()
	}

	if rc == nil {
		return dst, nil
	}

	_, data, err := rc.MarshalBSONValue() // always returns a document
	if err != nil {
		return dst, err
	}

	if sessionsSupported(desc.WireVersion) && client != nil {
		if client.Consistent && client.OperationTime != nil {
			data = data[:len(data)-1] // remove the null byte
			data = bsoncore.AppendTimestampElement(data, "afterClusterTime", client.OperationTime.T, client.OperationTime.I)
			data, _ = bsoncore.AppendDocumentEnd(data, 0)
		}
		if client.Snapshot && client.SnapshotTime != nil {
			data = data[:len(data)-1] // remove the null byte
			data = bsoncore.AppendTimestampElement(data, "atClusterTime", client.SnapshotTime.T, client.SnapshotTime.I)
			data, _ = bsoncore.AppendDocumentEnd(data, 0)
		}
	}

	if len(data) == bsoncore.EmptyDocumentLength {
		return dst, nil
	}
	return bsoncore.AppendDocumentElement(dst, "readConcern", data), nil
}

func (op Operation) addWriteConcern(dst []byte, desc description.SelectedServer) ([]byte, error) {
	if op.MinimumWriteConcernWireVersion > 0 && (desc.WireVersion == nil || !desc.WireVersion.Includes(op.MinimumWriteConcernWireVersion)) {
		return dst, nil
	}
	wc := op.WriteConcern
	if wc == nil {
		return dst, nil
	}

	t, data, err := wc.MarshalBSONValue()
	if errors.Is(err, writeconcern.ErrEmptyWriteConcern) {
		return dst, nil
	}
	if err != nil {
		return dst, err
	}

	return append(bsoncore.AppendHeader(dst, t, "writeConcern"), data...), nil
}

func (op Operation) addSession(dst []byte, desc description.SelectedServer) ([]byte, error) {
	client := op.Client

	// If the operation is defined for an explicit session but the server
	// does not support sessions, then throw an error.
	if client != nil && !client.IsImplicit && desc.SessionTimeoutMinutesPtr == nil {
		return nil, fmt.Errorf("current topology does not support sessions")
	}

	if client == nil || !sessionsSupported(desc.WireVersion) || desc.SessionTimeoutMinutesPtr == nil {
		return dst, nil
	}
	if err := client.UpdateUseTime(); err != nil {
		return dst, err
	}
	dst = bsoncore.AppendDocumentElement(dst, "lsid", client.SessionID)

	var addedTxnNumber bool
	if op.Type == Write && client.RetryWrite {
		addedTxnNumber = true
		dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber)
	}
	if client.TransactionRunning() || client.RetryingCommit {
		if !addedTxnNumber {
			dst = bsoncore.AppendInt64Element(dst, "txnNumber", op.Client.TxnNumber)
		}
		if client.TransactionStarting() {
			dst = bsoncore.AppendBooleanElement(dst, "startTransaction", true)
		}
		dst = bsoncore.AppendBooleanElement(dst, "autocommit", false)
	}

	return dst, client.ApplyCommand(desc.Server)
}

func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) []byte {
	client, clock := op.Client, op.Clock
	if (clock == nil && client == nil) || !sessionsSupported(desc.WireVersion) {
		return dst
	}
	clusterTime := clock.GetClusterTime()
	if client != nil {
		clusterTime = session.MaxClusterTime(clusterTime, client.ClusterTime)
	}
	if clusterTime == nil {
		return dst
	}
	val, err := clusterTime.LookupErr("$clusterTime")
	if err != nil {
		return dst
	}
	return append(bsoncore.AppendHeader(dst, val.Type, "$clusterTime"), val.Value...)
	// return bsoncore.AppendDocumentElement(dst, "$clusterTime", clusterTime)
}

// calculateMaxTimeMS calculates the value of the 'maxTimeMS' field to potentially append
// to the wire message based on the current context's deadline and the 90th percentile RTT
// if the ctx is a Timeout context. If the context is not a Timeout context, it uses the
// operation's MaxTimeMS if set. If no MaxTimeMS is set on the operation, and context is
// not a Timeout context, calculateMaxTimeMS returns 0.
func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, rttStats string) (uint64, error) {
	if csot.IsTimeoutContext(ctx) {
		if deadline, ok := ctx.Deadline(); ok {
			remainingTimeout := time.Until(deadline)
			maxTime := remainingTimeout - rtt90

			// Always round up to the next millisecond value so we never truncate the calculated
			// maxTimeMS value (e.g. 400 microseconds evaluates to 1ms, not 0ms).
			maxTimeMS := int64((maxTime + (time.Millisecond - 1)) / time.Millisecond)
			if maxTimeMS <= 0 {
				return 0, fmt.Errorf(
					"remaining time %v until context deadline is less than or equal to 90th percentile RTT: %w\n%v",
					remainingTimeout,
					ErrDeadlineWouldBeExceeded,
					rttStats)
			}
			return uint64(maxTimeMS), nil
		}
	} else if op.MaxTime != nil {
		// Users are not allowed to pass a negative value as MaxTime. A value of 0 would indicate
		// no timeout and is allowed.
		if *op.MaxTime < 0 {
			return 0, ErrNegativeMaxTime
		}
		// Always round up to the next millisecond value so we never truncate the requested
		// MaxTime value (e.g. 400 microseconds evaluates to 1ms, not 0ms).
		return uint64((*op.MaxTime + (time.Millisecond - 1)) / time.Millisecond), nil
	}
	return 0, nil
}

// updateClusterTimes updates the cluster times for the session and cluster clock attached to this
// operation. While the session's AdvanceClusterTime may return an error, this method does not
// because an error being returned from this method will not be returned further up.
func (op Operation) updateClusterTimes(response bsoncore.Document) {
	// Extract cluster time.
	value, err := response.LookupErr("$clusterTime")
	if err != nil {
		// $clusterTime not included by the server
		return
	}
	clusterTime := bsoncore.BuildDocumentFromElements(nil, bsoncore.AppendValueElement(nil, "$clusterTime", value))

	sess, clock := op.Client, op.Clock

	if sess != nil {
		_ = sess.AdvanceClusterTime(bson.Raw(clusterTime))
	}

	if clock != nil {
		clock.AdvanceClusterTime(bson.Raw(clusterTime))
	}
}

// updateOperationTime updates the operation time on the session attached to this operation. While
// the session's AdvanceOperationTime method may return an error, this method does not because an
// error being returned from this method will not be returned further up.
func (op Operation) updateOperationTime(response bsoncore.Document) {
	sess := op.Client
	if sess == nil {
		return
	}

	opTimeElem, err := response.LookupErr("operationTime")
	if err != nil {
		// operationTime not included by the server
		return
	}

	t, i := opTimeElem.Timestamp()
	_ = sess.AdvanceOperationTime(&primitive.Timestamp{
		T: t,
		I: i,
	})
}

func (op Operation) getReadPrefBasedOnTransaction() (*readpref.ReadPref, error) {
	if op.Client != nil && op.Client.TransactionRunning() {
		// Transaction's read preference always takes priority
		rp := op.Client.CurrentRp
		// Reads in a transaction must have read preference primary
		// This must not be checked in startTransaction
		if rp != nil && !op.Client.TransactionStarting() && rp.Mode() != readpref.PrimaryMode {
			return nil, ErrNonPrimaryReadPref
		}
		return rp, nil
	}
	return op.ReadPreference, nil
}

// createReadPref will attempt to create a document with the "readPreference"
// object and various related fields such as "mode", "tags", and
// "maxStalenessSeconds".
func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bool) (bsoncore.Document, error) {
	if op.omitReadPreference {
		return nil, nil
	}

	// TODO(GODRIVER-2231): Instead of checking if isOutputAggregate and desc.Server.WireVersion.Max < 13, somehow check
	// TODO if supplied readPreference was "overwritten" with primary in description.selectForReplicaSet.
	if desc.Server.Kind == description.Standalone || (isOpQuery && desc.Server.Kind != description.Mongos) ||
		op.Type == Write || (op.IsOutputAggregate && desc.Server.WireVersion.Max < 13) {
		// Don't send read preference for:
		// 1. all standalones
		// 2. non-mongos when using OP_QUERY
		// 3. all writes
		// 4. when operation is an aggregate with an output stage, and selected server's wire
		//    version is < 13
		return nil, nil
	}

	idx, doc := bsoncore.AppendDocumentStart(nil)
	rp, err := op.getReadPrefBasedOnTransaction()
	if err != nil {
		return nil, err
	}

	if rp == nil {
		if desc.Kind == description.Single && desc.Server.Kind != description.Mongos {
			doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
			doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
			return doc, nil
		}
		return nil, nil
	}

	switch rp.Mode() {
	case readpref.PrimaryMode:
		if desc.Server.Kind == description.Mongos {
			return nil, nil
		}
		if desc.Kind == description.Single {
			doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
			doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
			return doc, nil
		}

		// OP_MSG requires never sending read preference "primary"
		// except for topology "single".
		//
		// It is important to note that although the Go Driver does not
		// support legacy opcodes, OP_QUERY has different rules for
		// adding read preference to commands.
		return nil, nil
	case readpref.PrimaryPreferredMode:
		doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
	case readpref.SecondaryPreferredMode:
		_, ok := rp.MaxStaleness()
		if desc.Server.Kind == description.Mongos && isOpQuery && !ok && len(rp.TagSets()) == 0 && rp.HedgeEnabled() == nil {
			return nil, nil
		}
		doc = bsoncore.AppendStringElement(doc, "mode", "secondaryPreferred")
	case readpref.SecondaryMode:
		doc = bsoncore.AppendStringElement(doc, "mode", "secondary")
	case readpref.NearestMode:
		doc = bsoncore.AppendStringElement(doc, "mode", "nearest")
	}

	sets := make([]bsoncore.Document, 0, len(rp.TagSets()))
	for _, ts := range rp.TagSets() {
		i, set := bsoncore.AppendDocumentStart(nil)
		for _, t := range ts {
			set = bsoncore.AppendStringElement(set, t.Name, t.Value)
		}
		set, _ = bsoncore.AppendDocumentEnd(set, i)
		sets = append(sets, set)
	}
	if len(sets) > 0 {
		var aidx int32
		aidx, doc = bsoncore.AppendArrayElementStart(doc, "tags")
		for i, set := range sets {
			doc = bsoncore.AppendDocumentElement(doc, strconv.Itoa(i), set)
		}
		doc, _ = bsoncore.AppendArrayEnd(doc, aidx)
	}

	if d, ok := rp.MaxStaleness(); ok {
		doc = bsoncore.AppendInt32Element(doc, "maxStalenessSeconds", int32(d.Seconds()))
	}

	if hedgeEnabled := rp.HedgeEnabled(); hedgeEnabled != nil {
		var hedgeIdx int32
		hedgeIdx, doc = bsoncore.AppendDocumentElementStart(doc, "hedge")
		doc = bsoncore.AppendBooleanElement(doc, "enabled", *hedgeEnabled)
		doc, err = bsoncore.AppendDocumentEnd(doc, hedgeIdx)
		if err != nil {
			return nil, fmt.Errorf("error creating hedge document: %w", err)
		}
	}

	doc, _ = bsoncore.AppendDocumentEnd(doc, idx)
	return doc, nil
}

func (op Operation) secondaryOK(desc description.SelectedServer) wiremessage.QueryFlag {
	if desc.Kind == description.Single && desc.Server.Kind != description.Mongos {
		return wiremessage.SecondaryOK
	}

	if rp := op.ReadPreference; rp != nil && rp.Mode() != readpref.PrimaryMode {
		return wiremessage.SecondaryOK
	}

	return 0
}

func (Operation) canCompress(cmd string) bool {
	if cmd == handshake.LegacyHello || cmd == "hello" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "authenticate" ||
		cmd == "createUser" || cmd == "updateUser" || cmd == "copydbSaslStart" || cmd == "copydbgetnonce" || cmd == "copydb" {
		return false
	}
	return true
}

// decodeOpReply extracts the necessary information from an OP_REPLY wire message.
// Returns the decoded OP_REPLY. If the err field of the returned opReply is non-nil, an error occurred while decoding
// or validating the response and the other fields are undefined.
func (Operation) decodeOpReply(wm []byte) opReply {
	var reply opReply
	var ok bool

	reply.responseFlags, wm, ok = wiremessage.ReadReplyFlags(wm)
	if !ok {
		reply.err = errors.New("malformed OP_REPLY: missing flags")
		return reply
	}
	reply.cursorID, wm, ok = wiremessage.ReadReplyCursorID(wm)
	if !ok {
		reply.err = errors.New("malformed OP_REPLY: missing cursorID")
		return reply
	}
	reply.startingFrom, wm, ok = wiremessage.ReadReplyStartingFrom(wm)
	if !ok {
		reply.err = errors.New("malformed OP_REPLY: missing startingFrom")
		return reply
	}
	reply.numReturned, wm, ok = wiremessage.ReadReplyNumberReturned(wm)
	if !ok {
		reply.err = errors.New("malformed OP_REPLY: missing numberReturned")
		return reply
	}
	reply.documents, _, ok = wiremessage.ReadReplyDocuments(wm)
	if !ok {
		reply.err = errors.New("malformed OP_REPLY: could not read documents from reply")
	}

	if reply.responseFlags&wiremessage.QueryFailure == wiremessage.QueryFailure {
		reply.err = QueryFailureError{
			Message:  "command failure",
			Response: reply.documents[0],
		}
		return reply
	}
	if reply.responseFlags&wiremessage.CursorNotFound == wiremessage.CursorNotFound {
		reply.err = ErrCursorNotFound
		return reply
	}
	if reply.numReturned != int32(len(reply.documents)) {
		reply.err = ErrReplyDocumentMismatch
		return reply
	}

	return reply
}

func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) {
	switch opcode {
	case wiremessage.OpReply:
		reply := op.decodeOpReply(wm)
		if reply.err != nil {
			return nil, reply.err
		}
		if reply.numReturned == 0 {
			return nil, ErrNoDocCommandResponse
		}
		if reply.numReturned > 1 {
			return nil, ErrMultiDocCommandResponse
		}
		rdr := reply.documents[0]
		if err := rdr.Validate(); err != nil {
			return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err)
		}

		return rdr, ExtractErrorFromServerResponse(rdr)
	case wiremessage.OpMsg:
		_, wm, ok := wiremessage.ReadMsgFlags(wm)
		if !ok {
			return nil, errors.New("malformed wire message: missing OP_MSG flags")
		}

		var res bsoncore.Document
		for len(wm) > 0 {
			var stype wiremessage.SectionType
			stype, wm, ok = wiremessage.ReadMsgSectionType(wm)
			if !ok {
				return nil, errors.New("malformed wire message: insuffienct bytes to read section type")
			}

			switch stype {
			case wiremessage.SingleDocument:
				res, wm, ok = wiremessage.ReadMsgSectionSingleDocument(wm)
				if !ok {
					return nil, errors.New("malformed wire message: insufficient bytes to read single document")
				}
			case wiremessage.DocumentSequence:
				// TODO(GODRIVER-617): Implement document sequence returns.
				_, _, wm, ok = wiremessage.ReadMsgSectionDocumentSequence(wm)
				if !ok {
					return nil, errors.New("malformed wire message: insufficient bytes to read document sequence")
				}
			default:
				return nil, fmt.Errorf("malformed wire message: unknown section type %v", stype)
			}
		}

		err := res.Validate()
		if err != nil {
			return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err)
		}

		return res, ExtractErrorFromServerResponse(res)
	default:
		return nil, fmt.Errorf("cannot decode result from %s", opcode)
	}
}

// getCommandName returns the name of the command from the given BSON document.
func (op Operation) getCommandName(doc []byte) string {
	// skip 4 bytes for document length and 1 byte for element type
	idx := bytes.IndexByte(doc[5:], 0x00) // look for the 0 byte after the command name
	return string(doc[5 : idx+5])
}

func (op *Operation) redactCommand(cmd string, doc bsoncore.Document) bool {
	if cmd == "authenticate" || cmd == "saslStart" || cmd == "saslContinue" || cmd == "getnonce" || cmd == "createUser" ||
		cmd == "updateUser" || cmd == "copydbgetnonce" || cmd == "copydbsaslstart" || cmd == "copydb" {

		return true
	}
	if strings.ToLower(cmd) != handshake.LegacyHelloLowercase && cmd != "hello" {
		return false
	}

	// A hello without speculative authentication can be monitored.
	_, err := doc.LookupErr("speculativeAuthenticate")
	return err == nil
}

// canLogCommandMessage returns true if the command can be logged.
func (op Operation) canLogCommandMessage() bool {
	return op.Logger != nil && op.Logger.LevelComponentEnabled(logger.LevelDebug, logger.ComponentCommand)
}

func (op Operation) canPublishStartedEvent() bool {
	return op.CommandMonitor != nil && op.CommandMonitor.Started != nil
}

// publishStartedEvent publishes a CommandStartedEvent to the operation's command monitor if possible. If the command is
// an unacknowledged write, a CommandSucceededEvent will be published as well. If started events are not being monitored,
// no events are published.
func (op Operation) publishStartedEvent(ctx context.Context, info startedInformation) {
	// If logging is enabled for the command component at the debug level, log the command response.
	if op.canLogCommandMessage() {
		host, port, _ := net.SplitHostPort(info.serverAddress.String())

		redactedCmd := redactStartedInformationCmd(op, info).String()
		formattedCmd := logger.FormatMessage(redactedCmd, op.Logger.MaxDocumentLength)

		op.Logger.Print(logger.LevelDebug,
			logger.ComponentCommand,
			logger.CommandStarted,
			logger.SerializeCommand(logger.Command{
				DriverConnectionID: info.driverConnectionID,
				Message:            logger.CommandStarted,
				Name:               info.cmdName,
				DatabaseName:       op.Database,
				RequestID:          int64(info.requestID),
				ServerConnectionID: info.serverConnID,
				ServerHost:         host,
				ServerPort:         port,
				ServiceID:          info.serviceID,
			},
				logger.KeyCommand, formattedCmd)...)

	}

	if op.canPublishStartedEvent() {
		started := &event.CommandStartedEvent{
			Command:              redactStartedInformationCmd(op, info),
			DatabaseName:         op.Database,
			CommandName:          info.cmdName,
			RequestID:            int64(info.requestID),
			ConnectionID:         info.connID,
			ServerConnectionID:   convertInt64PtrToInt32Ptr(info.serverConnID),
			ServerConnectionID64: info.serverConnID,
			ServiceID:            info.serviceID,
		}
		op.CommandMonitor.Started(ctx, started)
	}
}

// canPublishSucceededEvent returns true if a CommandSucceededEvent can be
// published for the given command. This is true if the command is not an
// unacknowledged write and the command monitor is monitoring succeeded events.
func (op Operation) canPublishFinishedEvent(info finishedInformation) bool {
	success := info.success()

	return op.CommandMonitor != nil &&
		(!success || op.CommandMonitor.Succeeded != nil) &&
		(success || op.CommandMonitor.Failed != nil)
}

// publishFinishedEvent publishes either a CommandSucceededEvent or a CommandFailedEvent to the operation's command
// monitor if possible. If success/failure events aren't being monitored, no events are published.
func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInformation) {
	if op.canLogCommandMessage() && info.success() {
		host, port, _ := net.SplitHostPort(info.serverAddress.String())

		redactedReply := redactFinishedInformationResponse(info).String()
		formattedReply := logger.FormatMessage(redactedReply, op.Logger.MaxDocumentLength)

		op.Logger.Print(logger.LevelDebug,
			logger.ComponentCommand,
			logger.CommandSucceeded,
			logger.SerializeCommand(logger.Command{
				DriverConnectionID: info.driverConnectionID,
				Message:            logger.CommandSucceeded,
				Name:               info.cmdName,
				DatabaseName:       op.Database,
				RequestID:          int64(info.requestID),
				ServerConnectionID: info.serverConnID,
				ServerHost:         host,
				ServerPort:         port,
				ServiceID:          info.serviceID,
			},
				logger.KeyDurationMS, info.duration.Milliseconds(),
				logger.KeyReply, formattedReply)...)
	}

	if op.canLogCommandMessage() && !info.success() {
		host, port, _ := net.SplitHostPort(info.serverAddress.String())

		formattedReply := logger.FormatMessage(info.cmdErr.Error(), op.Logger.MaxDocumentLength)

		op.Logger.Print(logger.LevelDebug,
			logger.ComponentCommand,
			logger.CommandFailed,
			logger.SerializeCommand(logger.Command{
				DriverConnectionID: info.driverConnectionID,
				Message:            logger.CommandFailed,
				Name:               info.cmdName,
				DatabaseName:       op.Database,
				RequestID:          int64(info.requestID),
				ServerConnectionID: info.serverConnID,
				ServerHost:         host,
				ServerPort:         port,
				ServiceID:          info.serviceID,
			},
				logger.KeyDurationMS, info.duration.Milliseconds(),
				logger.KeyFailure, formattedReply)...)
	}

	// If the finished event cannot be published, return early.
	if !op.canPublishFinishedEvent(info) {
		return
	}

	finished := event.CommandFinishedEvent{
		CommandName:          info.cmdName,
		DatabaseName:         op.Database,
		RequestID:            int64(info.requestID),
		ConnectionID:         info.connID,
		Duration:             info.duration,
		DurationNanos:        info.duration.Nanoseconds(),
		ServerConnectionID:   convertInt64PtrToInt32Ptr(info.serverConnID),
		ServerConnectionID64: info.serverConnID,
		ServiceID:            info.serviceID,
	}

	if info.success() {
		successEvent := &event.CommandSucceededEvent{
			Reply:                redactFinishedInformationResponse(info),
			CommandFinishedEvent: finished,
		}
		op.CommandMonitor.Succeeded(ctx, successEvent)

		return
	}

	failedEvent := &event.CommandFailedEvent{
		Failure:              info.cmdErr.Error(),
		CommandFinishedEvent: finished,
	}
	op.CommandMonitor.Failed(ctx, failedEvent)
}

// sessionsSupported returns true of the given server version indicates that it supports sessions.
func sessionsSupported(wireVersion *description.VersionRange) bool {
	return wireVersion != nil
}

// retryWritesSupported returns true if this description represents a server that supports retryable writes.
func retryWritesSupported(s description.Server) bool {
	return s.SessionTimeoutMinutesPtr != nil && s.Kind != description.Standalone
}

Youez - 2016 - github.com/yon3zu
LinuXploit